Skip to main content

fast_telemetry/metric/dynamic/
distribution.rs

1//! Runtime-labeled distribution for dynamic dimensions.
2//!
3//! Uses base-2 exponential histogram buckets per label set, matching the
4//! `Distribution` implementation.  Each label set × thread gets its own
5//! fixed-size bucket array.
6
7#[cfg(feature = "eviction")]
8use super::current_cycle;
9use super::{DISTRIBUTION_IDS, DynamicLabelSet};
10use crate::exp_buckets::{ExpBuckets, ExpBucketsSnapshot};
11use crossbeam_utils::CachePadded;
12use parking_lot::{Mutex, RwLock};
13use std::cell::RefCell;
14use std::collections::HashMap;
15use std::hash::{Hash, Hasher};
16use std::sync::Arc;
17use std::sync::Weak;
18#[cfg(feature = "eviction")]
19use std::sync::atomic::AtomicU32;
20use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering};
21
22const DEFAULT_MAX_SERIES: usize = 2000;
23const OVERFLOW_LABEL_KEY: &str = "__ft_overflow";
24const OVERFLOW_LABEL_VALUE: &str = "true";
25type DistributionIndexShard =
26    CachePadded<RwLock<HashMap<DynamicLabelSet, Arc<DistributionSeries>>>>;
27type DistributionSnapshotEntry = (DynamicLabelSet, u64, u64, ExpBucketsSnapshot);
28static SERIES_IDS: AtomicUsize = AtomicUsize::new(1);
29
30struct DistributionSeries {
31    id: usize,
32    registry: Mutex<Vec<Arc<ExpBuckets>>>,
33    /// Tombstone flag set by exporter before removing from map.
34    evicted: AtomicBool,
35    /// Last export cycle when this series was accessed.
36    #[cfg(feature = "eviction")]
37    last_accessed_cycle: AtomicU32,
38}
39
40impl DistributionSeries {
41    #[cfg(feature = "eviction")]
42    fn new(cycle: u32) -> Self {
43        Self {
44            id: SERIES_IDS.fetch_add(1, Ordering::Relaxed),
45            registry: Mutex::new(Vec::new()),
46            evicted: AtomicBool::new(false),
47            last_accessed_cycle: AtomicU32::new(cycle),
48        }
49    }
50
51    #[cfg(not(feature = "eviction"))]
52    fn new() -> Self {
53        Self {
54            id: SERIES_IDS.fetch_add(1, Ordering::Relaxed),
55            registry: Mutex::new(Vec::new()),
56            evicted: AtomicBool::new(false),
57        }
58    }
59
60    /// Touch the series timestamp. Called on slow path only.
61    #[cfg(feature = "eviction")]
62    #[inline]
63    fn touch(&self, cycle: u32) {
64        self.last_accessed_cycle.store(cycle, Ordering::Relaxed);
65    }
66
67    #[inline]
68    fn is_evicted(&self) -> bool {
69        self.evicted.load(Ordering::Relaxed)
70    }
71
72    #[cfg(feature = "eviction")]
73    fn mark_evicted(&self) {
74        self.evicted.store(true, Ordering::Relaxed);
75    }
76
77    fn get_or_create_buf(&self) -> Arc<ExpBuckets> {
78        let buf = Arc::new(ExpBuckets::new());
79        self.registry.lock().push(Arc::clone(&buf));
80        buf
81    }
82
83    fn count(&self) -> u64 {
84        self.registry.lock().iter().map(|buf| buf.get_count()).sum()
85    }
86
87    fn sum(&self) -> u64 {
88        self.registry.lock().iter().map(|buf| buf.get_sum()).sum()
89    }
90
91    fn buckets_snapshot(&self) -> ExpBucketsSnapshot {
92        let mut positive = [0u64; 64];
93        let mut zero_count = 0u64;
94        let mut sum = 0u64;
95        let mut count = 0u64;
96
97        let registry = self.registry.lock();
98        for buf in registry.iter() {
99            let thread_buckets = buf.get_positive_buckets();
100            for (i, &c) in thread_buckets.iter().enumerate() {
101                positive[i] += c;
102            }
103            zero_count += buf.get_zero_count();
104            sum += buf.get_sum();
105            count += buf.get_count();
106        }
107
108        ExpBucketsSnapshot {
109            positive,
110            zero_count,
111            sum,
112            count,
113        }
114    }
115}
116
117/// A reusable handle to a dynamic-label distribution series.
118///
119/// Use this for hot paths to avoid per-update label canonicalization and map
120/// lookups. Resolve once with `DynamicDistribution::series(...)`, then call
121/// `record()` on the handle.
122#[derive(Clone)]
123pub struct DynamicDistributionSeries {
124    series: Arc<DistributionSeries>,
125    buf: Arc<ExpBuckets>,
126}
127
128impl DynamicDistributionSeries {
129    /// Record a value.
130    #[inline]
131    pub fn record(&self, value: u64) {
132        self.buf.record(value);
133    }
134
135    /// Get the count across all threads for this series.
136    pub fn count(&self) -> u64 {
137        self.series.count()
138    }
139
140    /// Get the sum across all threads for this series.
141    pub fn sum(&self) -> u64 {
142        self.series.sum()
143    }
144
145    /// Check if this series handle has been evicted.
146    #[inline]
147    pub fn is_evicted(&self) -> bool {
148        self.series.is_evicted()
149    }
150}
151
152struct SeriesCacheEntry {
153    distribution_id: usize,
154    ordered_labels: Vec<(String, String)>,
155    series: Weak<DistributionSeries>,
156    buf: Arc<ExpBuckets>,
157}
158
159thread_local! {
160    static SERIES_CACHE: RefCell<Option<SeriesCacheEntry>> = const { RefCell::new(None) };
161    static SERIES_BUF_CACHE: RefCell<Vec<(usize, usize, Weak<ExpBuckets>)>> = const { RefCell::new(Vec::new()) };
162}
163
164/// Distribution keyed by runtime label sets.
165///
166/// Each label set gets its own set of thread-local exponential histogram buckets.
167pub struct DynamicDistribution {
168    id: usize,
169    max_series: usize,
170    shard_mask: usize,
171    index_shards: Vec<DistributionIndexShard>,
172    /// Approximate number of live series (incremented on insert, decremented on evict).
173    series_count: AtomicUsize,
174    /// Count of records routed to overflow bucket due to cardinality cap.
175    overflow_count: AtomicU64,
176}
177
178impl DynamicDistribution {
179    /// Creates a new runtime-labeled distribution with default cardinality cap.
180    pub fn new(shard_count: usize) -> Self {
181        Self::with_max_series(shard_count, DEFAULT_MAX_SERIES)
182    }
183
184    /// Creates a new runtime-labeled distribution with a custom cardinality cap.
185    pub fn with_max_series(shard_count: usize, max_series: usize) -> Self {
186        let shard_count = shard_count.next_power_of_two();
187        let id = DISTRIBUTION_IDS.fetch_add(1, Ordering::Relaxed);
188        Self {
189            id,
190            max_series,
191            shard_mask: shard_count - 1,
192            index_shards: (0..shard_count)
193                .map(|_| CachePadded::new(RwLock::new(HashMap::new())))
194                .collect(),
195            series_count: AtomicUsize::new(0),
196            overflow_count: AtomicU64::new(0),
197        }
198    }
199
200    /// Resolve a reusable series handle for `labels`.
201    ///
202    /// Preferred for hot paths when labels come from a finite active set.
203    pub fn series(&self, labels: &[(&str, &str)]) -> DynamicDistributionSeries {
204        if let Some((series, buf)) = self.cached_series(labels) {
205            return DynamicDistributionSeries { series, buf };
206        }
207        let series = self.lookup_or_create(labels);
208        let buf = self.get_or_create_thread_buf(&series);
209        self.update_cache(labels, Arc::clone(&series), Arc::clone(&buf));
210        DynamicDistributionSeries { series, buf }
211    }
212
213    /// Record a value for the series identified by `labels`.
214    #[inline]
215    pub fn record(&self, labels: &[(&str, &str)], value: u64) {
216        if let Some((_series, buf)) = self.cached_series(labels) {
217            buf.record(value);
218            return;
219        }
220
221        let series = self.lookup_or_create(labels);
222        let buf = self.get_or_create_thread_buf(&series);
223        self.update_cache(labels, Arc::clone(&series), Arc::clone(&buf));
224        buf.record(value);
225    }
226
227    /// Get count for the series identified by `labels`.
228    pub fn count(&self, labels: &[(&str, &str)]) -> u64 {
229        let key = DynamicLabelSet::from_pairs(labels);
230        let index_shard = self.index_shard_for(&key);
231        self.index_shards[index_shard]
232            .read()
233            .get(&key)
234            .map(|series| series.count())
235            .unwrap_or(0)
236    }
237
238    /// Get sum for the series identified by `labels`.
239    pub fn sum(&self, labels: &[(&str, &str)]) -> u64 {
240        let key = DynamicLabelSet::from_pairs(labels);
241        let index_shard = self.index_shard_for(&key);
242        self.index_shards[index_shard]
243            .read()
244            .get(&key)
245            .map(|series| series.sum())
246            .unwrap_or(0)
247    }
248
249    /// Returns a snapshot of all label-sets with their stats.
250    pub fn snapshot(&self) -> Vec<DistributionSnapshotEntry> {
251        let mut out = Vec::new();
252        for shard in &self.index_shards {
253            let guard = shard.read();
254            for (labels, series) in guard.iter() {
255                let snap = series.buckets_snapshot();
256                out.push((labels.clone(), snap.count, snap.sum, snap));
257            }
258        }
259        out
260    }
261
262    /// Returns the current number of distinct label sets.
263    pub fn cardinality(&self) -> usize {
264        self.index_shards
265            .iter()
266            .map(|shard| shard.read().len())
267            .sum()
268    }
269
270    /// Returns the number of records routed to the overflow bucket.
271    ///
272    /// A non-zero value indicates the cardinality cap was hit and label
273    /// fidelity is being lost. Use this to alert on cardinality pressure.
274    pub fn overflow_count(&self) -> u64 {
275        self.overflow_count.load(Ordering::Relaxed)
276    }
277
278    /// Iterate all series without cloning label sets.
279    ///
280    /// Calls `f` with borrowed label pairs, count, sum, and bucket snapshot
281    /// for each series. Used by exporters/macros to avoid `snapshot()` cloning.
282    #[doc(hidden)]
283    pub fn visit_series(
284        &self,
285        mut f: impl FnMut(&[(String, String)], u64, u64, ExpBucketsSnapshot),
286    ) {
287        for shard in &self.index_shards {
288            let guard = shard.read();
289            for (labels, series) in guard.iter() {
290                let snap = series.buckets_snapshot();
291                f(labels.pairs(), snap.count, snap.sum, snap);
292            }
293        }
294    }
295
296    /// Evict series that haven't been accessed for `max_staleness` cycles.
297    ///
298    /// Call this after `advance_cycle()` in your sweeper task.
299    /// Series are marked as evicted (so cached handles see the tombstone),
300    /// then removed from the index.
301    ///
302    /// Protected series (Arc::strong_count > 1) are never evicted — someone
303    /// holds a DynamicDistributionSeries handle to them.
304    ///
305    /// Returns the number of series evicted.
306    #[cfg(feature = "eviction")]
307    pub fn evict_stale(&self, max_staleness: u32) -> usize {
308        let cycle = current_cycle();
309        let mut removed = 0;
310
311        for shard in &self.index_shards {
312            let mut guard = shard.write();
313            guard.retain(|_labels, series| {
314                // Protected if someone holds a handle (strong_count > 1 means
315                // both the map and at least one DynamicDistributionSeries hold refs)
316                if Arc::strong_count(series) > 1 {
317                    return true;
318                }
319                // Otherwise check timestamp staleness
320                let last = series.last_accessed_cycle.load(Ordering::Relaxed);
321                let stale = cycle.saturating_sub(last) > max_staleness;
322                if stale {
323                    series.mark_evicted();
324                    removed += 1;
325                    self.series_count.fetch_sub(1, Ordering::Relaxed);
326                }
327                !stale
328            });
329        }
330
331        removed
332    }
333
334    fn lookup_or_create(&self, labels: &[(&str, &str)]) -> Arc<DistributionSeries> {
335        let requested_key = DynamicLabelSet::from_pairs(labels);
336        let requested_shard = self.index_shard_for(&requested_key);
337        #[cfg(feature = "eviction")]
338        let cycle = current_cycle();
339
340        // Fast path: read lock only.
341        if let Some(series) = self.index_shards[requested_shard]
342            .read()
343            .get(&requested_key)
344        {
345            #[cfg(feature = "eviction")]
346            series.touch(cycle);
347            return Arc::clone(series);
348        }
349
350        // Check cardinality cap BEFORE taking any write lock (lock-free).
351        let key = if self.max_series > 0
352            && self.series_count.load(Ordering::Relaxed) >= self.max_series
353        {
354            self.overflow_count.fetch_add(1, Ordering::Relaxed);
355            DynamicLabelSet::from_pairs(&[(OVERFLOW_LABEL_KEY, OVERFLOW_LABEL_VALUE)])
356        } else {
357            requested_key
358        };
359        let shard = self.index_shard_for(&key);
360
361        if let Some(series) = self.index_shards[shard].read().get(&key) {
362            #[cfg(feature = "eviction")]
363            series.touch(cycle);
364            return Arc::clone(series);
365        }
366
367        let mut guard = self.index_shards[shard].write();
368        if let Some(series) = guard.get(&key) {
369            #[cfg(feature = "eviction")]
370            series.touch(cycle);
371            return Arc::clone(series);
372        }
373        #[cfg(feature = "eviction")]
374        let series = Arc::new(DistributionSeries::new(cycle));
375        #[cfg(not(feature = "eviction"))]
376        let series = Arc::new(DistributionSeries::new());
377        guard.insert(key, Arc::clone(&series));
378        self.series_count.fetch_add(1, Ordering::Relaxed);
379        series
380    }
381
382    fn index_shard_for(&self, key: &DynamicLabelSet) -> usize {
383        let mut hasher = std::collections::hash_map::DefaultHasher::new();
384        key.hash(&mut hasher);
385        (hasher.finish() as usize) & self.shard_mask
386    }
387
388    fn cached_series(
389        &self,
390        labels: &[(&str, &str)],
391    ) -> Option<(Arc<DistributionSeries>, Arc<ExpBuckets>)> {
392        SERIES_CACHE.with(|cache| {
393            let cache_ref = cache.borrow();
394            let entry = cache_ref.as_ref()?;
395            if entry.distribution_id != self.id {
396                return None;
397            }
398            if entry.ordered_labels.len() != labels.len() {
399                return None;
400            }
401            for (idx, (k, v)) in labels.iter().enumerate() {
402                let (ek, ev) = &entry.ordered_labels[idx];
403                if ek != k || ev != v {
404                    return None;
405                }
406            }
407            let series = entry.series.upgrade()?;
408            // Check tombstone - forces re-lookup if series was evicted
409            if series.is_evicted() {
410                return None;
411            }
412            #[cfg(feature = "eviction")]
413            series.touch(current_cycle());
414            Some((series, Arc::clone(&entry.buf)))
415        })
416    }
417
418    fn update_cache(
419        &self,
420        labels: &[(&str, &str)],
421        series: Arc<DistributionSeries>,
422        buf: Arc<ExpBuckets>,
423    ) {
424        SERIES_CACHE.with(|cache| {
425            let ordered_labels = labels
426                .iter()
427                .map(|(k, v)| ((*k).to_string(), (*v).to_string()))
428                .collect();
429            *cache.borrow_mut() = Some(SeriesCacheEntry {
430                distribution_id: self.id,
431                ordered_labels,
432                series: Arc::downgrade(&series),
433                buf,
434            });
435        });
436    }
437
438    fn get_or_create_thread_buf(&self, series: &Arc<DistributionSeries>) -> Arc<ExpBuckets> {
439        let dist_id = self.id;
440        let series_id = series.id;
441
442        SERIES_BUF_CACHE.with(|cache| {
443            let mut entries = cache.borrow_mut();
444            entries.retain(|(_id, _ptr, weak)| weak.strong_count() > 0);
445
446            for (id, ptr, weak) in entries.iter() {
447                if *id == dist_id
448                    && *ptr == series_id
449                    && let Some(buf) = weak.upgrade()
450                {
451                    return buf;
452                }
453            }
454
455            let buf = series.get_or_create_buf();
456            entries.push((dist_id, series_id, Arc::downgrade(&buf)));
457            buf
458        })
459    }
460}
461
462#[cfg(test)]
463mod tests {
464    use super::*;
465
466    #[test]
467    fn test_basic_recording() {
468        let dist = DynamicDistribution::new(4);
469        let labels = &[("org_id", "42")];
470
471        dist.record(labels, 100);
472        dist.record(labels, 200);
473        dist.record(labels, 300);
474
475        assert_eq!(dist.count(labels), 3);
476        assert_eq!(dist.sum(labels), 600);
477    }
478
479    #[test]
480    fn test_label_order_is_canonicalized() {
481        let dist = DynamicDistribution::new(4);
482
483        dist.record(&[("org_id", "42"), ("endpoint", "abc")], 100);
484
485        assert_eq!(dist.count(&[("endpoint", "abc"), ("org_id", "42")]), 1);
486    }
487
488    #[test]
489    fn test_series_handle() {
490        let dist = DynamicDistribution::new(4);
491        let series = dist.series(&[("org_id", "42")]);
492
493        series.record(100);
494        series.record(200);
495
496        assert_eq!(series.count(), 2);
497        assert_eq!(series.sum(), 300);
498        assert_eq!(dist.count(&[("org_id", "42")]), 2);
499    }
500
501    #[test]
502    fn test_multiple_label_sets() {
503        let dist = DynamicDistribution::new(4);
504
505        dist.record(&[("org_id", "1")], 100);
506        dist.record(&[("org_id", "2")], 200);
507
508        assert_eq!(dist.count(&[("org_id", "1")]), 1);
509        assert_eq!(dist.count(&[("org_id", "2")]), 1);
510
511        let snap = dist.snapshot();
512        assert_eq!(snap.len(), 2);
513    }
514
515    #[test]
516    fn test_overflow_bucket_routes_new_series_at_capacity() {
517        let dist = DynamicDistribution::with_max_series(4, 2);
518
519        dist.record(&[("org_id", "1")], 100);
520        dist.record(&[("org_id", "2")], 200);
521        // Third label set should overflow
522        dist.record(&[("org_id", "3")], 300);
523
524        assert_eq!(dist.cardinality(), 3); // 2 real + 1 overflow
525        assert!(dist.overflow_count() > 0);
526        assert_eq!(dist.count(&[(OVERFLOW_LABEL_KEY, OVERFLOW_LABEL_VALUE)]), 1);
527        assert_eq!(dist.sum(&[(OVERFLOW_LABEL_KEY, OVERFLOW_LABEL_VALUE)]), 300);
528    }
529
530    #[test]
531    fn test_snapshot_includes_buckets() {
532        let dist = DynamicDistribution::new(4);
533        dist.record(&[("org_id", "1")], 100);
534        dist.record(&[("org_id", "1")], 200);
535
536        let snap = dist.snapshot();
537        assert_eq!(snap.len(), 1);
538        let (_, count, sum, bucket_snap) = &snap[0];
539        assert_eq!(*count, 2);
540        assert_eq!(*sum, 300);
541        // Both 100 and 200 land in bucket 6 and 7 respectively
542        assert!(bucket_snap.positive[6] > 0 || bucket_snap.positive[7] > 0);
543    }
544}