Skip to main content

fast_telemetry/metric/dynamic/
histogram.rs

1//! Runtime-labeled histogram for dynamic dimensions.
2
3use super::cache::{CacheableSeries, LabelCache, SERIES_CACHE_SIZE};
4#[cfg(feature = "eviction")]
5use super::current_cycle;
6use super::{DynamicLabelSet, HISTOGRAM_IDS, thread_id};
7use crossbeam_utils::CachePadded;
8use parking_lot::RwLock;
9use std::cell::RefCell;
10use std::collections::HashMap;
11use std::hash::{Hash, Hasher};
12#[cfg(feature = "eviction")]
13use std::sync::atomic::AtomicU32;
14use std::sync::atomic::{AtomicBool, AtomicIsize, AtomicU64, AtomicUsize, Ordering};
15use std::sync::{Arc, Weak};
16
17const DEFAULT_MAX_SERIES: usize = 2000;
18const OVERFLOW_LABEL_KEY: &str = "__ft_overflow";
19const OVERFLOW_LABEL_VALUE: &str = "true";
20
21type HistogramIndexShard = CachePadded<RwLock<HashMap<DynamicLabelSet, Arc<HistogramSeries>>>>;
22type HistogramSnapshotEntry = (DynamicLabelSet, Vec<(u64, u64)>, u64, u64);
23
24struct ShardedCounter {
25    cells: Vec<CachePadded<AtomicIsize>>,
26}
27
28impl ShardedCounter {
29    fn new(shard_count: usize) -> Self {
30        Self {
31            cells: (0..shard_count)
32                .map(|_| CachePadded::new(AtomicIsize::new(0)))
33                .collect(),
34        }
35    }
36
37    #[inline]
38    fn add_at(&self, shard_idx: usize, value: isize) {
39        self.cells[shard_idx].fetch_add(value, Ordering::Relaxed);
40    }
41
42    #[inline]
43    fn inc_at(&self, shard_idx: usize) {
44        self.add_at(shard_idx, 1);
45    }
46
47    #[inline]
48    fn sum(&self) -> isize {
49        self.cells
50            .iter()
51            .map(|cell| cell.load(Ordering::Relaxed))
52            .sum()
53    }
54}
55
56struct HistogramSeries {
57    bounds: Arc<Vec<u64>>,
58    buckets: Vec<ShardedCounter>,
59    sum: ShardedCounter,
60    count: ShardedCounter,
61    /// Tombstone flag set by exporter before removing from map.
62    evicted: AtomicBool,
63    /// Last export cycle when this series was accessed.
64    #[cfg(feature = "eviction")]
65    last_accessed_cycle: AtomicU32,
66}
67
68impl HistogramSeries {
69    #[cfg(feature = "eviction")]
70    fn new(bounds: Arc<Vec<u64>>, shard_count: usize, cycle: u32) -> Self {
71        let buckets = (0..=bounds.len())
72            .map(|_| ShardedCounter::new(shard_count))
73            .collect();
74        Self {
75            bounds,
76            buckets,
77            sum: ShardedCounter::new(shard_count),
78            count: ShardedCounter::new(shard_count),
79            evicted: AtomicBool::new(false),
80            last_accessed_cycle: AtomicU32::new(cycle),
81        }
82    }
83
84    #[cfg(not(feature = "eviction"))]
85    fn new(bounds: Arc<Vec<u64>>, shard_count: usize) -> Self {
86        let buckets = (0..=bounds.len())
87            .map(|_| ShardedCounter::new(shard_count))
88            .collect();
89        Self {
90            bounds,
91            buckets,
92            sum: ShardedCounter::new(shard_count),
93            count: ShardedCounter::new(shard_count),
94            evicted: AtomicBool::new(false),
95        }
96    }
97
98    #[inline]
99    fn is_evicted(&self) -> bool {
100        self.evicted.load(Ordering::Relaxed)
101    }
102
103    #[cfg(feature = "eviction")]
104    fn mark_evicted(&self) {
105        self.evicted.store(true, Ordering::Relaxed);
106    }
107
108    #[inline]
109    fn record_at(&self, shard_idx: usize, value: u64) {
110        let bucket_idx = self
111            .bounds
112            .iter()
113            .position(|&bound| value <= bound)
114            .unwrap_or(self.bounds.len());
115        self.buckets[bucket_idx].inc_at(shard_idx);
116        self.sum.add_at(shard_idx, value as isize);
117        self.count.inc_at(shard_idx);
118        // Note: timestamp updated on slow path (lookup/cache miss) to avoid
119        // global atomic read on every record.
120    }
121
122    /// Touch the series timestamp. Called on slow path only.
123    #[cfg(feature = "eviction")]
124    #[inline]
125    fn touch(&self, cycle: u32) {
126        self.last_accessed_cycle.store(cycle, Ordering::Relaxed);
127    }
128
129    fn buckets_cumulative(&self) -> Vec<(u64, u64)> {
130        let mut result = Vec::with_capacity(self.buckets.len());
131        for (bound, cumulative) in self.buckets_cumulative_iter() {
132            result.push((bound, cumulative));
133        }
134        result
135    }
136
137    fn buckets_cumulative_iter(&self) -> impl Iterator<Item = (u64, u64)> + '_ {
138        let mut cumulative = 0i64;
139        self.buckets.iter().enumerate().map(move |(i, counter)| {
140            cumulative += counter.sum() as i64;
141            let bound = if i < self.bounds.len() {
142                self.bounds[i]
143            } else {
144                u64::MAX
145            };
146            (bound, cumulative as u64)
147        })
148    }
149
150    fn sum(&self) -> u64 {
151        self.sum.sum() as u64
152    }
153
154    fn count(&self) -> u64 {
155        self.count.sum() as u64
156    }
157}
158
159impl CacheableSeries for HistogramSeries {
160    fn is_evicted(&self) -> bool {
161        self.is_evicted()
162    }
163}
164
165/// A reusable handle to a dynamic-label histogram series.
166///
167/// Use this for hot paths to avoid per-update label canonicalization and map
168/// lookups. Resolve once with `DynamicHistogram::series(...)`, then call
169/// `record()` on the handle.
170#[derive(Clone)]
171pub struct DynamicHistogramSeries {
172    series: Arc<HistogramSeries>,
173    shard_mask: usize,
174}
175
176/// Borrowed read-only view of a dynamic histogram series.
177#[doc(hidden)]
178pub struct DynamicHistogramSeriesView<'a> {
179    series: &'a HistogramSeries,
180}
181
182impl<'a> DynamicHistogramSeriesView<'a> {
183    /// Iterate cumulative `(bound, count)` buckets without allocating.
184    #[doc(hidden)]
185    pub fn buckets_cumulative_iter(&self) -> impl Iterator<Item = (u64, u64)> + '_ {
186        self.series.buckets_cumulative_iter()
187    }
188
189    #[doc(hidden)]
190    pub fn sum(&self) -> u64 {
191        self.series.sum()
192    }
193
194    #[doc(hidden)]
195    pub fn count(&self) -> u64 {
196        self.series.count()
197    }
198}
199
200impl DynamicHistogramSeries {
201    /// Record a value in this histogram series.
202    #[inline]
203    pub fn record(&self, value: u64) {
204        let shard_idx = thread_id() & self.shard_mask;
205        self.series.record_at(shard_idx, value);
206    }
207
208    /// Get cumulative bucket counts.
209    pub fn buckets_cumulative(&self) -> Vec<(u64, u64)> {
210        self.series.buckets_cumulative()
211    }
212
213    /// Get the sum of all recorded values.
214    pub fn sum(&self) -> u64 {
215        self.series.sum()
216    }
217
218    /// Get the count of all recorded values.
219    pub fn count(&self) -> u64 {
220        self.series.count()
221    }
222
223    /// Check if this series handle has been evicted.
224    #[inline]
225    pub fn is_evicted(&self) -> bool {
226        self.series.is_evicted()
227    }
228}
229
230thread_local! {
231    static SERIES_CACHE: RefCell<LabelCache<Weak<HistogramSeries>, SERIES_CACHE_SIZE>> =
232        RefCell::new(LabelCache::new());
233}
234
235/// Histogram keyed by runtime label sets.
236///
237/// Uses sharded index for key->series lookup and per-series sharded counters
238/// for fast updates.
239pub struct DynamicHistogram {
240    id: usize,
241    bounds: Arc<Vec<u64>>,
242    shard_count: usize,
243    max_series: usize,
244    shard_mask: usize,
245    index_shards: Vec<HistogramIndexShard>,
246    /// Approximate number of live series (incremented on insert, decremented on evict).
247    series_count: AtomicUsize,
248    /// Count of records routed to overflow bucket due to cardinality cap.
249    overflow_count: AtomicU64,
250}
251
252impl DynamicHistogram {
253    /// Creates a new runtime-labeled histogram with given bucket boundaries.
254    pub fn new(bounds: &[u64], shard_count: usize) -> Self {
255        Self::with_limits(bounds, shard_count, DEFAULT_MAX_SERIES)
256    }
257
258    /// Creates a new runtime-labeled histogram with a series cardinality cap.
259    ///
260    /// When the number of unique label sets approximately reaches `max_series`,
261    /// new label sets are redirected into a single overflow series
262    /// (`__ft_overflow=true`). The cap is checked via a lock-free atomic counter,
263    /// so concurrent inserts may briefly overshoot by the number of in-flight
264    /// writers before the overflow kicks in.
265    pub fn with_limits(bounds: &[u64], shard_count: usize, max_series: usize) -> Self {
266        let shard_count = shard_count.next_power_of_two();
267        let id = HISTOGRAM_IDS.fetch_add(1, Ordering::Relaxed);
268        Self {
269            id,
270            bounds: Arc::new(bounds.to_vec()),
271            shard_count,
272            max_series,
273            shard_mask: shard_count - 1,
274            index_shards: (0..shard_count)
275                .map(|_| CachePadded::new(RwLock::new(HashMap::new())))
276                .collect(),
277            series_count: AtomicUsize::new(0),
278            overflow_count: AtomicU64::new(0),
279        }
280    }
281
282    /// Creates a histogram with default latency buckets (in microseconds).
283    pub fn with_latency_buckets(shard_count: usize) -> Self {
284        Self::with_limits(
285            &[
286                10,         // 10µs
287                50,         // 50µs
288                100,        // 100µs
289                500,        // 500µs
290                1_000,      // 1ms
291                5_000,      // 5ms
292                10_000,     // 10ms
293                50_000,     // 50ms
294                100_000,    // 100ms
295                500_000,    // 500ms
296                1_000_000,  // 1s
297                5_000_000,  // 5s
298                10_000_000, // 10s
299            ],
300            shard_count,
301            DEFAULT_MAX_SERIES,
302        )
303    }
304
305    /// Resolve a reusable series handle for `labels`.
306    ///
307    /// Preferred for hot paths when labels come from a finite active set.
308    pub fn series(&self, labels: &[(&str, &str)]) -> DynamicHistogramSeries {
309        if let Some(series) = self.cached_series(labels) {
310            return DynamicHistogramSeries {
311                series,
312                shard_mask: self.shard_mask,
313            };
314        }
315        let series = self.lookup_or_create(labels);
316        self.update_cache(labels, &series);
317        DynamicHistogramSeries {
318            series,
319            shard_mask: self.shard_mask,
320        }
321    }
322
323    /// Record a value for the series identified by `labels`.
324    #[inline]
325    pub fn record(&self, labels: &[(&str, &str)], value: u64) {
326        if let Some(series) = self.cached_series(labels) {
327            let shard_idx = thread_id() & self.shard_mask;
328            series.record_at(shard_idx, value);
329            return;
330        }
331
332        let series = self.lookup_or_create(labels);
333        self.update_cache(labels, &series);
334        let shard_idx = thread_id() & self.shard_mask;
335        series.record_at(shard_idx, value);
336    }
337
338    /// Get cumulative bucket counts for the series identified by `labels`.
339    pub fn buckets_cumulative(&self, labels: &[(&str, &str)]) -> Vec<(u64, u64)> {
340        let key = DynamicLabelSet::from_pairs(labels);
341        let index_shard = self.index_shard_for(&key);
342        self.index_shards[index_shard]
343            .read()
344            .get(&key)
345            .map(|series| series.buckets_cumulative())
346            .unwrap_or_default()
347    }
348
349    /// Get sum for the series identified by `labels`.
350    pub fn sum(&self, labels: &[(&str, &str)]) -> u64 {
351        let key = DynamicLabelSet::from_pairs(labels);
352        let index_shard = self.index_shard_for(&key);
353        self.index_shards[index_shard]
354            .read()
355            .get(&key)
356            .map(|series| series.sum())
357            .unwrap_or(0)
358    }
359
360    /// Get count for the series identified by `labels`.
361    pub fn count(&self, labels: &[(&str, &str)]) -> u64 {
362        let key = DynamicLabelSet::from_pairs(labels);
363        let index_shard = self.index_shard_for(&key);
364        self.index_shards[index_shard]
365            .read()
366            .get(&key)
367            .map(|series| series.count())
368            .unwrap_or(0)
369    }
370
371    /// Returns a snapshot of all label-set with their histogram data.
372    pub fn snapshot(&self) -> Vec<HistogramSnapshotEntry> {
373        let mut out = Vec::new();
374        for shard in &self.index_shards {
375            let guard = shard.read();
376            for (labels, series) in guard.iter() {
377                out.push((
378                    labels.clone(),
379                    series.buckets_cumulative(),
380                    series.sum(),
381                    series.count(),
382                ));
383            }
384        }
385        out
386    }
387
388    /// Returns the current number of distinct label sets.
389    pub fn cardinality(&self) -> usize {
390        self.index_shards
391            .iter()
392            .map(|shard| shard.read().len())
393            .sum()
394    }
395
396    /// Returns the number of records routed to the overflow bucket.
397    ///
398    /// A non-zero value indicates the cardinality cap was hit and label
399    /// fidelity is being lost. Use this to alert on cardinality pressure.
400    pub fn overflow_count(&self) -> u64 {
401        self.overflow_count.load(Ordering::Relaxed)
402    }
403
404    /// Iterate all series without cloning label sets.
405    ///
406    /// Calls `f` with borrowed label pairs and a borrowed series view.
407    /// Used by exporters/macros to avoid `snapshot()` and bucket vec allocations.
408    /// The callback runs while a shard read lock is held; it must be fast and
409    /// must not call back into this metric.
410    #[doc(hidden)]
411    pub fn visit_series<F>(&self, mut f: F)
412    where
413        F: for<'a> FnMut(&'a [(String, String)], DynamicHistogramSeriesView<'a>),
414    {
415        for shard in &self.index_shards {
416            let guard = shard.read();
417            for (labels, series) in guard.iter() {
418                f(labels.pairs(), DynamicHistogramSeriesView { series });
419            }
420        }
421    }
422
423    /// Evict series that haven't been accessed for `max_staleness` cycles.
424    ///
425    /// Call this after `advance_cycle()` in your exporter task.
426    /// Series are marked as evicted (so cached handles see the tombstone),
427    /// then removed from the index.
428    ///
429    /// Protected series (Arc::strong_count > 1) are never evicted - someone
430    /// holds a DynamicHistogramSeries handle to them.
431    ///
432    /// Returns the number of series evicted.
433    #[cfg(feature = "eviction")]
434    pub fn evict_stale(&self, max_staleness: u32) -> usize {
435        let cycle = current_cycle();
436        let mut removed = 0;
437
438        for shard in &self.index_shards {
439            let mut guard = shard.write();
440            guard.retain(|_labels, series| {
441                // Protected if someone holds a handle (strong_count > 1 means
442                // both the map and at least one DynamicHistogramSeries hold refs)
443                if Arc::strong_count(series) > 1 {
444                    return true;
445                }
446                // Otherwise check timestamp staleness
447                let last = series.last_accessed_cycle.load(Ordering::Relaxed);
448                let stale = cycle.saturating_sub(last) > max_staleness;
449                if stale {
450                    series.mark_evicted();
451                    removed += 1;
452                    self.series_count.fetch_sub(1, Ordering::Relaxed);
453                }
454                !stale
455            });
456        }
457
458        removed
459    }
460
461    fn lookup_or_create(&self, labels: &[(&str, &str)]) -> Arc<HistogramSeries> {
462        let requested_key = DynamicLabelSet::from_pairs(labels);
463        let requested_shard = self.index_shard_for(&requested_key);
464        #[cfg(feature = "eviction")]
465        let cycle = current_cycle();
466
467        // Fast path: read lock only.
468        if let Some(series) = self.index_shards[requested_shard]
469            .read()
470            .get(&requested_key)
471        {
472            #[cfg(feature = "eviction")]
473            series.touch(cycle);
474            return Arc::clone(series);
475        }
476
477        // Check cardinality cap BEFORE taking any write lock (lock-free).
478        let key = if self.max_series > 0
479            && self.series_count.load(Ordering::Relaxed) >= self.max_series
480        {
481            self.overflow_count.fetch_add(1, Ordering::Relaxed);
482            DynamicLabelSet::from_pairs(&[(OVERFLOW_LABEL_KEY, OVERFLOW_LABEL_VALUE)])
483        } else {
484            requested_key
485        };
486        let shard = self.index_shard_for(&key);
487
488        if let Some(series) = self.index_shards[shard].read().get(&key) {
489            #[cfg(feature = "eviction")]
490            series.touch(cycle);
491            return Arc::clone(series);
492        }
493
494        let mut guard = self.index_shards[shard].write();
495        if let Some(series) = guard.get(&key) {
496            #[cfg(feature = "eviction")]
497            series.touch(cycle);
498            return Arc::clone(series);
499        }
500        #[cfg(feature = "eviction")]
501        let series = Arc::new(HistogramSeries::new(
502            Arc::clone(&self.bounds),
503            self.shard_count,
504            cycle,
505        ));
506        #[cfg(not(feature = "eviction"))]
507        let series = Arc::new(HistogramSeries::new(
508            Arc::clone(&self.bounds),
509            self.shard_count,
510        ));
511        guard.insert(key, Arc::clone(&series));
512        self.series_count.fetch_add(1, Ordering::Relaxed);
513        series
514    }
515
516    fn index_shard_for(&self, key: &DynamicLabelSet) -> usize {
517        let mut hasher = std::collections::hash_map::DefaultHasher::new();
518        key.hash(&mut hasher);
519        (hasher.finish() as usize) & self.shard_mask
520    }
521
522    fn cached_series(&self, labels: &[(&str, &str)]) -> Option<Arc<HistogramSeries>> {
523        SERIES_CACHE.with(|cache| {
524            let series = cache.borrow_mut().get(self.id, labels)?;
525            #[cfg(feature = "eviction")]
526            series.touch(current_cycle());
527            Some(series)
528        })
529    }
530
531    fn update_cache(&self, labels: &[(&str, &str)], series: &Arc<HistogramSeries>) {
532        SERIES_CACHE.with(|cache| {
533            cache
534                .borrow_mut()
535                .insert(self.id, labels, Arc::downgrade(series));
536        });
537    }
538}
539
540#[cfg(test)]
541mod tests {
542    use super::*;
543
544    #[test]
545    fn test_basic_recording() {
546        let h = DynamicHistogram::new(&[10, 100, 1000], 4);
547        let labels = &[("org_id", "42")];
548
549        h.record(labels, 5); // bucket 0 (≤10)
550        h.record(labels, 50); // bucket 1 (≤100)
551        h.record(labels, 500); // bucket 2 (≤1000)
552        h.record(labels, 5000); // bucket 3 (+Inf)
553
554        let buckets = h.buckets_cumulative(labels);
555        assert_eq!(buckets.len(), 4);
556        assert_eq!(buckets[0], (10, 1));
557        assert_eq!(buckets[1], (100, 2));
558        assert_eq!(buckets[2], (1000, 3));
559        assert_eq!(buckets[3], (u64::MAX, 4));
560
561        assert_eq!(h.count(labels), 4);
562        assert_eq!(h.sum(labels), 5 + 50 + 500 + 5000);
563    }
564
565    #[test]
566    fn test_label_order_is_canonicalized() {
567        let h = DynamicHistogram::new(&[10, 100], 4);
568
569        h.record(&[("org_id", "42"), ("endpoint", "abc")], 5);
570
571        assert_eq!(h.count(&[("endpoint", "abc"), ("org_id", "42")]), 1);
572    }
573
574    #[test]
575    fn test_series_handle() {
576        let h = DynamicHistogram::new(&[10, 100, 1000], 4);
577        let series = h.series(&[("org_id", "42")]);
578
579        series.record(5);
580        series.record(50);
581        series.record(500);
582
583        assert_eq!(series.count(), 3);
584        assert_eq!(series.sum(), 555);
585        assert_eq!(h.count(&[("org_id", "42")]), 3);
586    }
587
588    #[test]
589    fn test_multiple_label_sets() {
590        let h = DynamicHistogram::new(&[100], 4);
591
592        h.record(&[("org_id", "1")], 50);
593        h.record(&[("org_id", "2")], 150);
594
595        assert_eq!(h.count(&[("org_id", "1")]), 1);
596        assert_eq!(h.count(&[("org_id", "2")]), 1);
597
598        let snap = h.snapshot();
599        assert_eq!(snap.len(), 2);
600    }
601
602    #[test]
603    fn test_overflow_bucket_routes_new_series_at_capacity() {
604        let h = DynamicHistogram::with_limits(&[100], 4, 1);
605
606        h.record(&[("org_id", "1")], 50);
607        h.record(&[("org_id", "2")], 150);
608
609        assert_eq!(h.cardinality(), 2);
610        assert_eq!(h.count(&[(OVERFLOW_LABEL_KEY, OVERFLOW_LABEL_VALUE)]), 1);
611        assert_eq!(h.sum(&[(OVERFLOW_LABEL_KEY, OVERFLOW_LABEL_VALUE)]), 150);
612    }
613
614    #[test]
615    fn test_concurrent_cap_bounded_overshoot() {
616        use std::sync::{Arc, Barrier};
617        use std::thread;
618
619        let cap = 10;
620        let threads = 16;
621        let h = Arc::new(DynamicHistogram::with_limits(&[100, 1000], 4, cap));
622        let barrier = Arc::new(Barrier::new(threads));
623
624        let handles: Vec<_> = (0..threads)
625            .map(|t| {
626                let h = Arc::clone(&h);
627                let barrier = Arc::clone(&barrier);
628                thread::spawn(move || {
629                    barrier.wait();
630                    for i in 0..5 {
631                        let label = format!("t{t}_s{i}");
632                        h.record(&[("key", &label)], 42);
633                    }
634                })
635            })
636            .collect();
637
638        for handle in handles {
639            handle.join().unwrap();
640        }
641
642        let card = h.cardinality();
643        assert!(
644            card <= cap + threads + 1,
645            "cardinality {card} exceeded bounded overshoot (cap={cap}, threads={threads})"
646        );
647        assert!(h.overflow_count() > 0, "overflow should have triggered");
648    }
649}