Skip to main content

fast_telemetry/metric/dynamic/
histogram.rs

1//! Runtime-labeled histogram for dynamic dimensions.
2
3use super::cache::{CacheableSeries, LabelCache, SERIES_CACHE_SIZE};
4#[cfg(feature = "eviction")]
5use super::current_cycle;
6use super::{DynamicLabelSet, HISTOGRAM_IDS, thread_id};
7use crossbeam_utils::CachePadded;
8use parking_lot::RwLock;
9use std::cell::RefCell;
10use std::collections::HashMap;
11use std::hash::{Hash, Hasher};
12#[cfg(feature = "eviction")]
13use std::sync::atomic::AtomicU32;
14use std::sync::atomic::{AtomicBool, AtomicIsize, AtomicU64, AtomicUsize, Ordering};
15use std::sync::{Arc, Weak};
16
17const DEFAULT_MAX_SERIES: usize = 2000;
18const OVERFLOW_LABEL_KEY: &str = "__ft_overflow";
19const OVERFLOW_LABEL_VALUE: &str = "true";
20
21type HistogramIndexShard = CachePadded<RwLock<HashMap<DynamicLabelSet, Arc<HistogramSeries>>>>;
22type HistogramSnapshotEntry = (DynamicLabelSet, Vec<(u64, u64)>, u64, u64);
23
24struct ShardedCounter {
25    cells: Vec<CachePadded<AtomicIsize>>,
26}
27
28impl ShardedCounter {
29    fn new(shard_count: usize) -> Self {
30        Self {
31            cells: (0..shard_count)
32                .map(|_| CachePadded::new(AtomicIsize::new(0)))
33                .collect(),
34        }
35    }
36
37    #[inline]
38    fn add_at(&self, shard_idx: usize, value: isize) {
39        self.cells[shard_idx].fetch_add(value, Ordering::Relaxed);
40    }
41
42    #[inline]
43    fn inc_at(&self, shard_idx: usize) {
44        self.add_at(shard_idx, 1);
45    }
46
47    #[inline]
48    fn sum(&self) -> isize {
49        self.cells
50            .iter()
51            .map(|cell| cell.load(Ordering::Relaxed))
52            .sum()
53    }
54}
55
56struct HistogramSeries {
57    bounds: Arc<Vec<u64>>,
58    buckets: Vec<ShardedCounter>,
59    sum: ShardedCounter,
60    count: ShardedCounter,
61    /// Tombstone flag set by exporter before removing from map.
62    evicted: AtomicBool,
63    /// Last export cycle when this series was accessed.
64    #[cfg(feature = "eviction")]
65    last_accessed_cycle: AtomicU32,
66}
67
68impl HistogramSeries {
69    #[cfg(feature = "eviction")]
70    fn new(bounds: Arc<Vec<u64>>, shard_count: usize, cycle: u32) -> Self {
71        let buckets = (0..=bounds.len())
72            .map(|_| ShardedCounter::new(shard_count))
73            .collect();
74        Self {
75            bounds,
76            buckets,
77            sum: ShardedCounter::new(shard_count),
78            count: ShardedCounter::new(shard_count),
79            evicted: AtomicBool::new(false),
80            last_accessed_cycle: AtomicU32::new(cycle),
81        }
82    }
83
84    #[cfg(not(feature = "eviction"))]
85    fn new(bounds: Arc<Vec<u64>>, shard_count: usize) -> Self {
86        let buckets = (0..=bounds.len())
87            .map(|_| ShardedCounter::new(shard_count))
88            .collect();
89        Self {
90            bounds,
91            buckets,
92            sum: ShardedCounter::new(shard_count),
93            count: ShardedCounter::new(shard_count),
94            evicted: AtomicBool::new(false),
95        }
96    }
97
98    #[inline]
99    fn is_evicted(&self) -> bool {
100        self.evicted.load(Ordering::Relaxed)
101    }
102
103    #[cfg(feature = "eviction")]
104    fn mark_evicted(&self) {
105        self.evicted.store(true, Ordering::Relaxed);
106    }
107
108    #[inline]
109    fn record_at(&self, shard_idx: usize, value: u64) {
110        let bucket_idx = self
111            .bounds
112            .iter()
113            .position(|&bound| value <= bound)
114            .unwrap_or(self.bounds.len());
115        self.buckets[bucket_idx].inc_at(shard_idx);
116        self.sum.add_at(shard_idx, value as isize);
117        self.count.inc_at(shard_idx);
118        // Note: timestamp updated on slow path (lookup/cache miss) to avoid
119        // global atomic read on every record.
120    }
121
122    /// Touch the series timestamp. Called on slow path only.
123    #[cfg(feature = "eviction")]
124    #[inline]
125    fn touch(&self, cycle: u32) {
126        self.last_accessed_cycle.store(cycle, Ordering::Relaxed);
127    }
128
129    fn buckets_cumulative(&self) -> Vec<(u64, u64)> {
130        let mut result = Vec::with_capacity(self.buckets.len());
131        for (bound, cumulative) in self.buckets_cumulative_iter() {
132            result.push((bound, cumulative));
133        }
134        result
135    }
136
137    fn buckets_cumulative_iter(&self) -> impl Iterator<Item = (u64, u64)> + '_ {
138        let mut cumulative = 0i64;
139        self.buckets.iter().enumerate().map(move |(i, counter)| {
140            cumulative += counter.sum() as i64;
141            let bound = if i < self.bounds.len() {
142                self.bounds[i]
143            } else {
144                u64::MAX
145            };
146            (bound, cumulative as u64)
147        })
148    }
149
150    fn sum(&self) -> u64 {
151        self.sum.sum() as u64
152    }
153
154    fn count(&self) -> u64 {
155        self.count.sum() as u64
156    }
157}
158
159impl CacheableSeries for HistogramSeries {
160    fn is_evicted(&self) -> bool {
161        self.is_evicted()
162    }
163}
164
165/// A reusable handle to a dynamic-label histogram series.
166///
167/// Use this for hot paths to avoid per-update label canonicalization and map
168/// lookups. Resolve once with `DynamicHistogram::series(...)`, then call
169/// `record()` on the handle.
170#[derive(Clone)]
171pub struct DynamicHistogramSeries {
172    series: Arc<HistogramSeries>,
173    shard_mask: usize,
174}
175
176/// Borrowed read-only view of a dynamic histogram series.
177#[doc(hidden)]
178pub struct DynamicHistogramSeriesView<'a> {
179    series: &'a HistogramSeries,
180}
181
182impl<'a> DynamicHistogramSeriesView<'a> {
183    /// Iterate cumulative `(bound, count)` buckets without allocating.
184    #[doc(hidden)]
185    pub fn buckets_cumulative_iter(&self) -> impl Iterator<Item = (u64, u64)> + '_ {
186        self.series.buckets_cumulative_iter()
187    }
188
189    #[doc(hidden)]
190    pub fn sum(&self) -> u64 {
191        self.series.sum()
192    }
193
194    #[doc(hidden)]
195    pub fn count(&self) -> u64 {
196        self.series.count()
197    }
198}
199
200impl DynamicHistogramSeries {
201    /// Record a value in this histogram series.
202    #[inline]
203    pub fn record(&self, value: u64) {
204        let shard_idx = thread_id() & self.shard_mask;
205        self.series.record_at(shard_idx, value);
206    }
207
208    /// Get cumulative bucket counts.
209    pub fn buckets_cumulative(&self) -> Vec<(u64, u64)> {
210        self.series.buckets_cumulative()
211    }
212
213    /// Get the sum of all recorded values.
214    pub fn sum(&self) -> u64 {
215        self.series.sum()
216    }
217
218    /// Get the count of all recorded values.
219    pub fn count(&self) -> u64 {
220        self.series.count()
221    }
222
223    /// Check if this series handle has been evicted.
224    #[inline]
225    pub fn is_evicted(&self) -> bool {
226        self.series.is_evicted()
227    }
228}
229
230thread_local! {
231    static SERIES_CACHE: RefCell<LabelCache<Weak<HistogramSeries>, SERIES_CACHE_SIZE>> =
232        RefCell::new(LabelCache::new());
233}
234
235/// Histogram keyed by runtime label sets.
236///
237/// Uses sharded index for key->series lookup and per-series sharded counters
238/// for fast updates.
239pub struct DynamicHistogram {
240    id: usize,
241    bounds: Arc<Vec<u64>>,
242    shard_count: usize,
243    max_series: usize,
244    shard_mask: usize,
245    index_shards: Vec<HistogramIndexShard>,
246    /// Approximate number of live series (incremented on insert, decremented on evict).
247    series_count: AtomicUsize,
248    /// Count of records routed to overflow bucket due to cardinality cap.
249    overflow_count: AtomicU64,
250}
251
252impl DynamicHistogram {
253    /// Creates a new runtime-labeled histogram with given bucket boundaries.
254    pub fn new(bounds: &[u64], shard_count: usize) -> Self {
255        Self::with_limits(bounds, shard_count, DEFAULT_MAX_SERIES)
256    }
257
258    /// Creates a new runtime-labeled histogram with a series cardinality cap.
259    ///
260    /// When the number of unique label sets approximately reaches `max_series`,
261    /// new label sets are redirected into a single overflow series
262    /// (`__ft_overflow=true`). The cap is checked via a lock-free atomic counter,
263    /// so concurrent inserts may briefly overshoot by the number of in-flight
264    /// writers before the overflow kicks in.
265    pub fn with_limits(bounds: &[u64], shard_count: usize, max_series: usize) -> Self {
266        let shard_count = shard_count.next_power_of_two();
267        let id = HISTOGRAM_IDS.fetch_add(1, Ordering::Relaxed);
268        Self {
269            id,
270            bounds: Arc::new(bounds.to_vec()),
271            shard_count,
272            max_series,
273            shard_mask: shard_count - 1,
274            index_shards: (0..shard_count)
275                .map(|_| CachePadded::new(RwLock::new(HashMap::new())))
276                .collect(),
277            series_count: AtomicUsize::new(0),
278            overflow_count: AtomicU64::new(0),
279        }
280    }
281
282    /// Creates a histogram with default latency buckets (in microseconds).
283    pub fn with_latency_buckets(shard_count: usize) -> Self {
284        Self::with_limits(
285            &[
286                10,         // 10µs
287                50,         // 50µs
288                100,        // 100µs
289                500,        // 500µs
290                1_000,      // 1ms
291                5_000,      // 5ms
292                10_000,     // 10ms
293                50_000,     // 50ms
294                100_000,    // 100ms
295                500_000,    // 500ms
296                1_000_000,  // 1s
297                5_000_000,  // 5s
298                10_000_000, // 10s
299            ],
300            shard_count,
301            DEFAULT_MAX_SERIES,
302        )
303    }
304
305    /// Resolve a reusable series handle for `labels`.
306    ///
307    /// Preferred for hot paths when labels come from a finite active set.
308    pub fn series(&self, labels: &[(&str, &str)]) -> DynamicHistogramSeries {
309        if let Some(series) = self.cached_series(labels) {
310            return DynamicHistogramSeries {
311                series,
312                shard_mask: self.shard_mask,
313            };
314        }
315        let series = self.lookup_or_create(labels);
316        self.update_cache(labels, &series);
317        DynamicHistogramSeries {
318            series,
319            shard_mask: self.shard_mask,
320        }
321    }
322
323    /// Record a value for the series identified by `labels`.
324    #[inline]
325    pub fn record(&self, labels: &[(&str, &str)], value: u64) {
326        if let Some(series) = self.cached_series(labels) {
327            let shard_idx = thread_id() & self.shard_mask;
328            series.record_at(shard_idx, value);
329            return;
330        }
331
332        let series = self.lookup_or_create(labels);
333        self.update_cache(labels, &series);
334        let shard_idx = thread_id() & self.shard_mask;
335        series.record_at(shard_idx, value);
336    }
337
338    /// Get cumulative bucket counts for the series identified by `labels`.
339    pub fn buckets_cumulative(&self, labels: &[(&str, &str)]) -> Vec<(u64, u64)> {
340        let key = DynamicLabelSet::from_pairs(labels);
341        let index_shard = self.index_shard_for(&key);
342        self.index_shards[index_shard]
343            .read()
344            .get(&key)
345            .map(|series| series.buckets_cumulative())
346            .unwrap_or_default()
347    }
348
349    /// Get sum for the series identified by `labels`.
350    pub fn sum(&self, labels: &[(&str, &str)]) -> u64 {
351        let key = DynamicLabelSet::from_pairs(labels);
352        let index_shard = self.index_shard_for(&key);
353        self.index_shards[index_shard]
354            .read()
355            .get(&key)
356            .map(|series| series.sum())
357            .unwrap_or(0)
358    }
359
360    /// Get count for the series identified by `labels`.
361    pub fn count(&self, labels: &[(&str, &str)]) -> u64 {
362        let key = DynamicLabelSet::from_pairs(labels);
363        let index_shard = self.index_shard_for(&key);
364        self.index_shards[index_shard]
365            .read()
366            .get(&key)
367            .map(|series| series.count())
368            .unwrap_or(0)
369    }
370
371    /// Returns a snapshot of all label-set with their histogram data.
372    pub fn snapshot(&self) -> Vec<HistogramSnapshotEntry> {
373        let mut out = Vec::new();
374        for shard in &self.index_shards {
375            let guard = shard.read();
376            for (labels, series) in guard.iter() {
377                out.push((
378                    labels.clone(),
379                    series.buckets_cumulative(),
380                    series.sum(),
381                    series.count(),
382                ));
383            }
384        }
385        out
386    }
387
388    /// Returns the current number of distinct label sets.
389    pub fn cardinality(&self) -> usize {
390        self.index_shards
391            .iter()
392            .map(|shard| shard.read().len())
393            .sum()
394    }
395
396    /// Returns the number of records routed to the overflow bucket.
397    ///
398    /// A non-zero value indicates the cardinality cap was hit and label
399    /// fidelity is being lost. Use this to alert on cardinality pressure.
400    pub fn overflow_count(&self) -> u64 {
401        self.overflow_count.load(Ordering::Relaxed)
402    }
403
404    /// Iterate all series without cloning label sets.
405    ///
406    /// Calls `f` with borrowed label pairs and a borrowed series view.
407    /// Used by exporters/macros to avoid `snapshot()` and bucket vec allocations.
408    #[doc(hidden)]
409    pub fn visit_series<F>(&self, mut f: F)
410    where
411        F: for<'a> FnMut(&'a [(String, String)], DynamicHistogramSeriesView<'a>),
412    {
413        for shard in &self.index_shards {
414            let guard = shard.read();
415            for (labels, series) in guard.iter() {
416                f(labels.pairs(), DynamicHistogramSeriesView { series });
417            }
418        }
419    }
420
421    /// Evict series that haven't been accessed for `max_staleness` cycles.
422    ///
423    /// Call this after `advance_cycle()` in your exporter task.
424    /// Series are marked as evicted (so cached handles see the tombstone),
425    /// then removed from the index.
426    ///
427    /// Protected series (Arc::strong_count > 1) are never evicted - someone
428    /// holds a DynamicHistogramSeries handle to them.
429    ///
430    /// Returns the number of series evicted.
431    #[cfg(feature = "eviction")]
432    pub fn evict_stale(&self, max_staleness: u32) -> usize {
433        let cycle = current_cycle();
434        let mut removed = 0;
435
436        for shard in &self.index_shards {
437            let mut guard = shard.write();
438            guard.retain(|_labels, series| {
439                // Protected if someone holds a handle (strong_count > 1 means
440                // both the map and at least one DynamicHistogramSeries hold refs)
441                if Arc::strong_count(series) > 1 {
442                    return true;
443                }
444                // Otherwise check timestamp staleness
445                let last = series.last_accessed_cycle.load(Ordering::Relaxed);
446                let stale = cycle.saturating_sub(last) > max_staleness;
447                if stale {
448                    series.mark_evicted();
449                    removed += 1;
450                    self.series_count.fetch_sub(1, Ordering::Relaxed);
451                }
452                !stale
453            });
454        }
455
456        removed
457    }
458
459    fn lookup_or_create(&self, labels: &[(&str, &str)]) -> Arc<HistogramSeries> {
460        let requested_key = DynamicLabelSet::from_pairs(labels);
461        let requested_shard = self.index_shard_for(&requested_key);
462        #[cfg(feature = "eviction")]
463        let cycle = current_cycle();
464
465        // Fast path: read lock only.
466        if let Some(series) = self.index_shards[requested_shard]
467            .read()
468            .get(&requested_key)
469        {
470            #[cfg(feature = "eviction")]
471            series.touch(cycle);
472            return Arc::clone(series);
473        }
474
475        // Check cardinality cap BEFORE taking any write lock (lock-free).
476        let key = if self.max_series > 0
477            && self.series_count.load(Ordering::Relaxed) >= self.max_series
478        {
479            self.overflow_count.fetch_add(1, Ordering::Relaxed);
480            DynamicLabelSet::from_pairs(&[(OVERFLOW_LABEL_KEY, OVERFLOW_LABEL_VALUE)])
481        } else {
482            requested_key
483        };
484        let shard = self.index_shard_for(&key);
485
486        if let Some(series) = self.index_shards[shard].read().get(&key) {
487            #[cfg(feature = "eviction")]
488            series.touch(cycle);
489            return Arc::clone(series);
490        }
491
492        let mut guard = self.index_shards[shard].write();
493        if let Some(series) = guard.get(&key) {
494            #[cfg(feature = "eviction")]
495            series.touch(cycle);
496            return Arc::clone(series);
497        }
498        #[cfg(feature = "eviction")]
499        let series = Arc::new(HistogramSeries::new(
500            Arc::clone(&self.bounds),
501            self.shard_count,
502            cycle,
503        ));
504        #[cfg(not(feature = "eviction"))]
505        let series = Arc::new(HistogramSeries::new(
506            Arc::clone(&self.bounds),
507            self.shard_count,
508        ));
509        guard.insert(key, Arc::clone(&series));
510        self.series_count.fetch_add(1, Ordering::Relaxed);
511        series
512    }
513
514    fn index_shard_for(&self, key: &DynamicLabelSet) -> usize {
515        let mut hasher = std::collections::hash_map::DefaultHasher::new();
516        key.hash(&mut hasher);
517        (hasher.finish() as usize) & self.shard_mask
518    }
519
520    fn cached_series(&self, labels: &[(&str, &str)]) -> Option<Arc<HistogramSeries>> {
521        SERIES_CACHE.with(|cache| {
522            let series = cache.borrow_mut().get(self.id, labels)?;
523            #[cfg(feature = "eviction")]
524            series.touch(current_cycle());
525            Some(series)
526        })
527    }
528
529    fn update_cache(&self, labels: &[(&str, &str)], series: &Arc<HistogramSeries>) {
530        SERIES_CACHE.with(|cache| {
531            cache
532                .borrow_mut()
533                .insert(self.id, labels, Arc::downgrade(series));
534        });
535    }
536}
537
538#[cfg(test)]
539mod tests {
540    use super::*;
541
542    #[test]
543    fn test_basic_recording() {
544        let h = DynamicHistogram::new(&[10, 100, 1000], 4);
545        let labels = &[("org_id", "42")];
546
547        h.record(labels, 5); // bucket 0 (≤10)
548        h.record(labels, 50); // bucket 1 (≤100)
549        h.record(labels, 500); // bucket 2 (≤1000)
550        h.record(labels, 5000); // bucket 3 (+Inf)
551
552        let buckets = h.buckets_cumulative(labels);
553        assert_eq!(buckets.len(), 4);
554        assert_eq!(buckets[0], (10, 1));
555        assert_eq!(buckets[1], (100, 2));
556        assert_eq!(buckets[2], (1000, 3));
557        assert_eq!(buckets[3], (u64::MAX, 4));
558
559        assert_eq!(h.count(labels), 4);
560        assert_eq!(h.sum(labels), 5 + 50 + 500 + 5000);
561    }
562
563    #[test]
564    fn test_label_order_is_canonicalized() {
565        let h = DynamicHistogram::new(&[10, 100], 4);
566
567        h.record(&[("org_id", "42"), ("endpoint", "abc")], 5);
568
569        assert_eq!(h.count(&[("endpoint", "abc"), ("org_id", "42")]), 1);
570    }
571
572    #[test]
573    fn test_series_handle() {
574        let h = DynamicHistogram::new(&[10, 100, 1000], 4);
575        let series = h.series(&[("org_id", "42")]);
576
577        series.record(5);
578        series.record(50);
579        series.record(500);
580
581        assert_eq!(series.count(), 3);
582        assert_eq!(series.sum(), 555);
583        assert_eq!(h.count(&[("org_id", "42")]), 3);
584    }
585
586    #[test]
587    fn test_multiple_label_sets() {
588        let h = DynamicHistogram::new(&[100], 4);
589
590        h.record(&[("org_id", "1")], 50);
591        h.record(&[("org_id", "2")], 150);
592
593        assert_eq!(h.count(&[("org_id", "1")]), 1);
594        assert_eq!(h.count(&[("org_id", "2")]), 1);
595
596        let snap = h.snapshot();
597        assert_eq!(snap.len(), 2);
598    }
599
600    #[test]
601    fn test_overflow_bucket_routes_new_series_at_capacity() {
602        let h = DynamicHistogram::with_limits(&[100], 4, 1);
603
604        h.record(&[("org_id", "1")], 50);
605        h.record(&[("org_id", "2")], 150);
606
607        assert_eq!(h.cardinality(), 2);
608        assert_eq!(h.count(&[(OVERFLOW_LABEL_KEY, OVERFLOW_LABEL_VALUE)]), 1);
609        assert_eq!(h.sum(&[(OVERFLOW_LABEL_KEY, OVERFLOW_LABEL_VALUE)]), 150);
610    }
611
612    #[test]
613    fn test_concurrent_cap_bounded_overshoot() {
614        use std::sync::{Arc, Barrier};
615        use std::thread;
616
617        let cap = 10;
618        let threads = 16;
619        let h = Arc::new(DynamicHistogram::with_limits(&[100, 1000], 4, cap));
620        let barrier = Arc::new(Barrier::new(threads));
621
622        let handles: Vec<_> = (0..threads)
623            .map(|t| {
624                let h = Arc::clone(&h);
625                let barrier = Arc::clone(&barrier);
626                thread::spawn(move || {
627                    barrier.wait();
628                    for i in 0..5 {
629                        let label = format!("t{t}_s{i}");
630                        h.record(&[("key", &label)], 42);
631                    }
632                })
633            })
634            .collect();
635
636        for handle in handles {
637            handle.join().unwrap();
638        }
639
640        let card = h.cardinality();
641        assert!(
642            card <= cap + threads + 1,
643            "cardinality {card} exceeded bounded overshoot (cap={cap}, threads={threads})"
644        );
645        assert!(h.overflow_count() > 0, "overflow should have triggered");
646    }
647}