Skip to main content

fast_telemetry/metric/dynamic/
histogram.rs

1//! Runtime-labeled histogram for dynamic dimensions.
2
3#[cfg(feature = "eviction")]
4use super::current_cycle;
5use super::{DynamicLabelSet, HISTOGRAM_IDS, thread_id};
6use crossbeam_utils::CachePadded;
7use parking_lot::RwLock;
8use std::cell::RefCell;
9use std::collections::HashMap;
10use std::hash::{Hash, Hasher};
11#[cfg(feature = "eviction")]
12use std::sync::atomic::AtomicU32;
13use std::sync::atomic::{AtomicBool, AtomicIsize, AtomicU64, AtomicUsize, Ordering};
14use std::sync::{Arc, Weak};
15
16const DEFAULT_MAX_SERIES: usize = 2000;
17const OVERFLOW_LABEL_KEY: &str = "__ft_overflow";
18const OVERFLOW_LABEL_VALUE: &str = "true";
19
20type HistogramIndexShard = CachePadded<RwLock<HashMap<DynamicLabelSet, Arc<HistogramSeries>>>>;
21type HistogramSnapshotEntry = (DynamicLabelSet, Vec<(u64, u64)>, u64, u64);
22
23struct ShardedCounter {
24    cells: Vec<CachePadded<AtomicIsize>>,
25}
26
27impl ShardedCounter {
28    fn new(shard_count: usize) -> Self {
29        Self {
30            cells: (0..shard_count)
31                .map(|_| CachePadded::new(AtomicIsize::new(0)))
32                .collect(),
33        }
34    }
35
36    #[inline]
37    fn add_at(&self, shard_idx: usize, value: isize) {
38        self.cells[shard_idx].fetch_add(value, Ordering::Relaxed);
39    }
40
41    #[inline]
42    fn inc_at(&self, shard_idx: usize) {
43        self.add_at(shard_idx, 1);
44    }
45
46    #[inline]
47    fn sum(&self) -> isize {
48        self.cells
49            .iter()
50            .map(|cell| cell.load(Ordering::Relaxed))
51            .sum()
52    }
53}
54
55struct HistogramSeries {
56    bounds: Arc<Vec<u64>>,
57    buckets: Vec<ShardedCounter>,
58    sum: ShardedCounter,
59    count: ShardedCounter,
60    /// Tombstone flag set by exporter before removing from map.
61    evicted: AtomicBool,
62    /// Last export cycle when this series was accessed.
63    #[cfg(feature = "eviction")]
64    last_accessed_cycle: AtomicU32,
65}
66
67impl HistogramSeries {
68    #[cfg(feature = "eviction")]
69    fn new(bounds: Arc<Vec<u64>>, shard_count: usize, cycle: u32) -> Self {
70        let buckets = (0..=bounds.len())
71            .map(|_| ShardedCounter::new(shard_count))
72            .collect();
73        Self {
74            bounds,
75            buckets,
76            sum: ShardedCounter::new(shard_count),
77            count: ShardedCounter::new(shard_count),
78            evicted: AtomicBool::new(false),
79            last_accessed_cycle: AtomicU32::new(cycle),
80        }
81    }
82
83    #[cfg(not(feature = "eviction"))]
84    fn new(bounds: Arc<Vec<u64>>, shard_count: usize) -> Self {
85        let buckets = (0..=bounds.len())
86            .map(|_| ShardedCounter::new(shard_count))
87            .collect();
88        Self {
89            bounds,
90            buckets,
91            sum: ShardedCounter::new(shard_count),
92            count: ShardedCounter::new(shard_count),
93            evicted: AtomicBool::new(false),
94        }
95    }
96
97    #[inline]
98    fn is_evicted(&self) -> bool {
99        self.evicted.load(Ordering::Relaxed)
100    }
101
102    #[cfg(feature = "eviction")]
103    fn mark_evicted(&self) {
104        self.evicted.store(true, Ordering::Relaxed);
105    }
106
107    #[inline]
108    fn record_at(&self, shard_idx: usize, value: u64) {
109        let bucket_idx = self
110            .bounds
111            .iter()
112            .position(|&bound| value <= bound)
113            .unwrap_or(self.bounds.len());
114        self.buckets[bucket_idx].inc_at(shard_idx);
115        self.sum.add_at(shard_idx, value as isize);
116        self.count.inc_at(shard_idx);
117        // Note: timestamp updated on slow path (lookup/cache miss) to avoid
118        // global atomic read on every record.
119    }
120
121    /// Touch the series timestamp. Called on slow path only.
122    #[cfg(feature = "eviction")]
123    #[inline]
124    fn touch(&self, cycle: u32) {
125        self.last_accessed_cycle.store(cycle, Ordering::Relaxed);
126    }
127
128    fn buckets_cumulative(&self) -> Vec<(u64, u64)> {
129        let mut result = Vec::with_capacity(self.buckets.len());
130        for (bound, cumulative) in self.buckets_cumulative_iter() {
131            result.push((bound, cumulative));
132        }
133        result
134    }
135
136    fn buckets_cumulative_iter(&self) -> impl Iterator<Item = (u64, u64)> + '_ {
137        let mut cumulative = 0i64;
138        self.buckets.iter().enumerate().map(move |(i, counter)| {
139            cumulative += counter.sum() as i64;
140            let bound = if i < self.bounds.len() {
141                self.bounds[i]
142            } else {
143                u64::MAX
144            };
145            (bound, cumulative as u64)
146        })
147    }
148
149    fn sum(&self) -> u64 {
150        self.sum.sum() as u64
151    }
152
153    fn count(&self) -> u64 {
154        self.count.sum() as u64
155    }
156}
157
158/// A reusable handle to a dynamic-label histogram series.
159///
160/// Use this for hot paths to avoid per-update label canonicalization and map
161/// lookups. Resolve once with `DynamicHistogram::series(...)`, then call
162/// `record()` on the handle.
163#[derive(Clone)]
164pub struct DynamicHistogramSeries {
165    series: Arc<HistogramSeries>,
166    shard_mask: usize,
167}
168
169/// Borrowed read-only view of a dynamic histogram series.
170#[doc(hidden)]
171pub struct DynamicHistogramSeriesView<'a> {
172    series: &'a HistogramSeries,
173}
174
175impl<'a> DynamicHistogramSeriesView<'a> {
176    /// Iterate cumulative `(bound, count)` buckets without allocating.
177    #[doc(hidden)]
178    pub fn buckets_cumulative_iter(&self) -> impl Iterator<Item = (u64, u64)> + '_ {
179        self.series.buckets_cumulative_iter()
180    }
181
182    #[doc(hidden)]
183    pub fn sum(&self) -> u64 {
184        self.series.sum()
185    }
186
187    #[doc(hidden)]
188    pub fn count(&self) -> u64 {
189        self.series.count()
190    }
191}
192
193impl DynamicHistogramSeries {
194    /// Record a value in this histogram series.
195    #[inline]
196    pub fn record(&self, value: u64) {
197        let shard_idx = thread_id() & self.shard_mask;
198        self.series.record_at(shard_idx, value);
199    }
200
201    /// Get cumulative bucket counts.
202    pub fn buckets_cumulative(&self) -> Vec<(u64, u64)> {
203        self.series.buckets_cumulative()
204    }
205
206    /// Get the sum of all recorded values.
207    pub fn sum(&self) -> u64 {
208        self.series.sum()
209    }
210
211    /// Get the count of all recorded values.
212    pub fn count(&self) -> u64 {
213        self.series.count()
214    }
215
216    /// Check if this series handle has been evicted.
217    #[inline]
218    pub fn is_evicted(&self) -> bool {
219        self.series.is_evicted()
220    }
221}
222
223struct SeriesCacheEntry {
224    histogram_id: usize,
225    ordered_labels: Vec<(String, String)>,
226    series: Weak<HistogramSeries>,
227}
228
229thread_local! {
230    static SERIES_CACHE: RefCell<Option<SeriesCacheEntry>> = const { RefCell::new(None) };
231}
232
233/// Histogram keyed by runtime label sets.
234///
235/// Uses sharded index for key->series lookup and per-series sharded counters
236/// for fast updates.
237pub struct DynamicHistogram {
238    id: usize,
239    bounds: Arc<Vec<u64>>,
240    shard_count: usize,
241    max_series: usize,
242    shard_mask: usize,
243    index_shards: Vec<HistogramIndexShard>,
244    /// Approximate number of live series (incremented on insert, decremented on evict).
245    series_count: AtomicUsize,
246    /// Count of records routed to overflow bucket due to cardinality cap.
247    overflow_count: AtomicU64,
248}
249
250impl DynamicHistogram {
251    /// Creates a new runtime-labeled histogram with given bucket boundaries.
252    pub fn new(bounds: &[u64], shard_count: usize) -> Self {
253        Self::with_limits(bounds, shard_count, DEFAULT_MAX_SERIES)
254    }
255
256    /// Creates a new runtime-labeled histogram with a series cardinality cap.
257    ///
258    /// When the number of unique label sets approximately reaches `max_series`,
259    /// new label sets are redirected into a single overflow series
260    /// (`__ft_overflow=true`). The cap is checked via a lock-free atomic counter,
261    /// so concurrent inserts may briefly overshoot by the number of in-flight
262    /// writers before the overflow kicks in.
263    pub fn with_limits(bounds: &[u64], shard_count: usize, max_series: usize) -> Self {
264        let shard_count = shard_count.next_power_of_two();
265        let id = HISTOGRAM_IDS.fetch_add(1, Ordering::Relaxed);
266        Self {
267            id,
268            bounds: Arc::new(bounds.to_vec()),
269            shard_count,
270            max_series,
271            shard_mask: shard_count - 1,
272            index_shards: (0..shard_count)
273                .map(|_| CachePadded::new(RwLock::new(HashMap::new())))
274                .collect(),
275            series_count: AtomicUsize::new(0),
276            overflow_count: AtomicU64::new(0),
277        }
278    }
279
280    /// Creates a histogram with default latency buckets (in microseconds).
281    pub fn with_latency_buckets(shard_count: usize) -> Self {
282        Self::with_limits(
283            &[
284                10,         // 10µs
285                50,         // 50µs
286                100,        // 100µs
287                500,        // 500µs
288                1_000,      // 1ms
289                5_000,      // 5ms
290                10_000,     // 10ms
291                50_000,     // 50ms
292                100_000,    // 100ms
293                500_000,    // 500ms
294                1_000_000,  // 1s
295                5_000_000,  // 5s
296                10_000_000, // 10s
297            ],
298            shard_count,
299            DEFAULT_MAX_SERIES,
300        )
301    }
302
303    /// Resolve a reusable series handle for `labels`.
304    ///
305    /// Preferred for hot paths when labels come from a finite active set.
306    pub fn series(&self, labels: &[(&str, &str)]) -> DynamicHistogramSeries {
307        if let Some(series) = self.cached_series(labels) {
308            return DynamicHistogramSeries {
309                series,
310                shard_mask: self.shard_mask,
311            };
312        }
313        let series = self.lookup_or_create(labels);
314        self.update_cache(labels, Arc::clone(&series));
315        DynamicHistogramSeries {
316            series,
317            shard_mask: self.shard_mask,
318        }
319    }
320
321    /// Record a value for the series identified by `labels`.
322    #[inline]
323    pub fn record(&self, labels: &[(&str, &str)], value: u64) {
324        if let Some(series) = self.cached_series(labels) {
325            let shard_idx = thread_id() & self.shard_mask;
326            series.record_at(shard_idx, value);
327            return;
328        }
329
330        let series = self.lookup_or_create(labels);
331        self.update_cache(labels, Arc::clone(&series));
332        let shard_idx = thread_id() & self.shard_mask;
333        series.record_at(shard_idx, value);
334    }
335
336    /// Get cumulative bucket counts for the series identified by `labels`.
337    pub fn buckets_cumulative(&self, labels: &[(&str, &str)]) -> Vec<(u64, u64)> {
338        let key = DynamicLabelSet::from_pairs(labels);
339        let index_shard = self.index_shard_for(&key);
340        self.index_shards[index_shard]
341            .read()
342            .get(&key)
343            .map(|series| series.buckets_cumulative())
344            .unwrap_or_default()
345    }
346
347    /// Get sum for the series identified by `labels`.
348    pub fn sum(&self, labels: &[(&str, &str)]) -> u64 {
349        let key = DynamicLabelSet::from_pairs(labels);
350        let index_shard = self.index_shard_for(&key);
351        self.index_shards[index_shard]
352            .read()
353            .get(&key)
354            .map(|series| series.sum())
355            .unwrap_or(0)
356    }
357
358    /// Get count for the series identified by `labels`.
359    pub fn count(&self, labels: &[(&str, &str)]) -> u64 {
360        let key = DynamicLabelSet::from_pairs(labels);
361        let index_shard = self.index_shard_for(&key);
362        self.index_shards[index_shard]
363            .read()
364            .get(&key)
365            .map(|series| series.count())
366            .unwrap_or(0)
367    }
368
369    /// Returns a snapshot of all label-set with their histogram data.
370    pub fn snapshot(&self) -> Vec<HistogramSnapshotEntry> {
371        let mut out = Vec::new();
372        for shard in &self.index_shards {
373            let guard = shard.read();
374            for (labels, series) in guard.iter() {
375                out.push((
376                    labels.clone(),
377                    series.buckets_cumulative(),
378                    series.sum(),
379                    series.count(),
380                ));
381            }
382        }
383        out
384    }
385
386    /// Returns the current number of distinct label sets.
387    pub fn cardinality(&self) -> usize {
388        self.index_shards
389            .iter()
390            .map(|shard| shard.read().len())
391            .sum()
392    }
393
394    /// Returns the number of records routed to the overflow bucket.
395    ///
396    /// A non-zero value indicates the cardinality cap was hit and label
397    /// fidelity is being lost. Use this to alert on cardinality pressure.
398    pub fn overflow_count(&self) -> u64 {
399        self.overflow_count.load(Ordering::Relaxed)
400    }
401
402    /// Iterate all series without cloning label sets.
403    ///
404    /// Calls `f` with borrowed label pairs and a borrowed series view.
405    /// Used by exporters/macros to avoid `snapshot()` and bucket vec allocations.
406    #[doc(hidden)]
407    pub fn visit_series<F>(&self, mut f: F)
408    where
409        F: for<'a> FnMut(&'a [(String, String)], DynamicHistogramSeriesView<'a>),
410    {
411        for shard in &self.index_shards {
412            let guard = shard.read();
413            for (labels, series) in guard.iter() {
414                f(labels.pairs(), DynamicHistogramSeriesView { series });
415            }
416        }
417    }
418
419    /// Evict series that haven't been accessed for `max_staleness` cycles.
420    ///
421    /// Call this after `advance_cycle()` in your exporter task.
422    /// Series are marked as evicted (so cached handles see the tombstone),
423    /// then removed from the index.
424    ///
425    /// Protected series (Arc::strong_count > 1) are never evicted - someone
426    /// holds a DynamicHistogramSeries handle to them.
427    ///
428    /// Returns the number of series evicted.
429    #[cfg(feature = "eviction")]
430    pub fn evict_stale(&self, max_staleness: u32) -> usize {
431        let cycle = current_cycle();
432        let mut removed = 0;
433
434        for shard in &self.index_shards {
435            let mut guard = shard.write();
436            guard.retain(|_labels, series| {
437                // Protected if someone holds a handle (strong_count > 1 means
438                // both the map and at least one DynamicHistogramSeries hold refs)
439                if Arc::strong_count(series) > 1 {
440                    return true;
441                }
442                // Otherwise check timestamp staleness
443                let last = series.last_accessed_cycle.load(Ordering::Relaxed);
444                let stale = cycle.saturating_sub(last) > max_staleness;
445                if stale {
446                    series.mark_evicted();
447                    removed += 1;
448                    self.series_count.fetch_sub(1, Ordering::Relaxed);
449                }
450                !stale
451            });
452        }
453
454        removed
455    }
456
457    fn lookup_or_create(&self, labels: &[(&str, &str)]) -> Arc<HistogramSeries> {
458        let requested_key = DynamicLabelSet::from_pairs(labels);
459        let requested_shard = self.index_shard_for(&requested_key);
460        #[cfg(feature = "eviction")]
461        let cycle = current_cycle();
462
463        // Fast path: read lock only.
464        if let Some(series) = self.index_shards[requested_shard]
465            .read()
466            .get(&requested_key)
467        {
468            #[cfg(feature = "eviction")]
469            series.touch(cycle);
470            return Arc::clone(series);
471        }
472
473        // Check cardinality cap BEFORE taking any write lock (lock-free).
474        let key = if self.max_series > 0
475            && self.series_count.load(Ordering::Relaxed) >= self.max_series
476        {
477            self.overflow_count.fetch_add(1, Ordering::Relaxed);
478            DynamicLabelSet::from_pairs(&[(OVERFLOW_LABEL_KEY, OVERFLOW_LABEL_VALUE)])
479        } else {
480            requested_key
481        };
482        let shard = self.index_shard_for(&key);
483
484        if let Some(series) = self.index_shards[shard].read().get(&key) {
485            #[cfg(feature = "eviction")]
486            series.touch(cycle);
487            return Arc::clone(series);
488        }
489
490        let mut guard = self.index_shards[shard].write();
491        if let Some(series) = guard.get(&key) {
492            #[cfg(feature = "eviction")]
493            series.touch(cycle);
494            return Arc::clone(series);
495        }
496        #[cfg(feature = "eviction")]
497        let series = Arc::new(HistogramSeries::new(
498            Arc::clone(&self.bounds),
499            self.shard_count,
500            cycle,
501        ));
502        #[cfg(not(feature = "eviction"))]
503        let series = Arc::new(HistogramSeries::new(
504            Arc::clone(&self.bounds),
505            self.shard_count,
506        ));
507        guard.insert(key, Arc::clone(&series));
508        self.series_count.fetch_add(1, Ordering::Relaxed);
509        series
510    }
511
512    fn index_shard_for(&self, key: &DynamicLabelSet) -> usize {
513        let mut hasher = std::collections::hash_map::DefaultHasher::new();
514        key.hash(&mut hasher);
515        (hasher.finish() as usize) & self.shard_mask
516    }
517
518    fn cached_series(&self, labels: &[(&str, &str)]) -> Option<Arc<HistogramSeries>> {
519        SERIES_CACHE.with(|cache| {
520            let cache_ref = cache.borrow();
521            let entry = cache_ref.as_ref()?;
522            if entry.histogram_id != self.id {
523                return None;
524            }
525            if entry.ordered_labels.len() != labels.len() {
526                return None;
527            }
528            for (idx, (k, v)) in labels.iter().enumerate() {
529                let (ek, ev) = &entry.ordered_labels[idx];
530                if ek != k || ev != v {
531                    return None;
532                }
533            }
534            let series = entry.series.upgrade()?;
535            if series.is_evicted() {
536                return None;
537            }
538            #[cfg(feature = "eviction")]
539            series.touch(current_cycle());
540            Some(series)
541        })
542    }
543
544    fn update_cache(&self, labels: &[(&str, &str)], series: Arc<HistogramSeries>) {
545        SERIES_CACHE.with(|cache| {
546            let ordered_labels = labels
547                .iter()
548                .map(|(k, v)| ((*k).to_string(), (*v).to_string()))
549                .collect();
550            *cache.borrow_mut() = Some(SeriesCacheEntry {
551                histogram_id: self.id,
552                ordered_labels,
553                series: Arc::downgrade(&series),
554            });
555        });
556    }
557}
558
559#[cfg(test)]
560mod tests {
561    use super::*;
562
563    #[test]
564    fn test_basic_recording() {
565        let h = DynamicHistogram::new(&[10, 100, 1000], 4);
566        let labels = &[("org_id", "42")];
567
568        h.record(labels, 5); // bucket 0 (≤10)
569        h.record(labels, 50); // bucket 1 (≤100)
570        h.record(labels, 500); // bucket 2 (≤1000)
571        h.record(labels, 5000); // bucket 3 (+Inf)
572
573        let buckets = h.buckets_cumulative(labels);
574        assert_eq!(buckets.len(), 4);
575        assert_eq!(buckets[0], (10, 1));
576        assert_eq!(buckets[1], (100, 2));
577        assert_eq!(buckets[2], (1000, 3));
578        assert_eq!(buckets[3], (u64::MAX, 4));
579
580        assert_eq!(h.count(labels), 4);
581        assert_eq!(h.sum(labels), 5 + 50 + 500 + 5000);
582    }
583
584    #[test]
585    fn test_label_order_is_canonicalized() {
586        let h = DynamicHistogram::new(&[10, 100], 4);
587
588        h.record(&[("org_id", "42"), ("endpoint", "abc")], 5);
589
590        assert_eq!(h.count(&[("endpoint", "abc"), ("org_id", "42")]), 1);
591    }
592
593    #[test]
594    fn test_series_handle() {
595        let h = DynamicHistogram::new(&[10, 100, 1000], 4);
596        let series = h.series(&[("org_id", "42")]);
597
598        series.record(5);
599        series.record(50);
600        series.record(500);
601
602        assert_eq!(series.count(), 3);
603        assert_eq!(series.sum(), 555);
604        assert_eq!(h.count(&[("org_id", "42")]), 3);
605    }
606
607    #[test]
608    fn test_multiple_label_sets() {
609        let h = DynamicHistogram::new(&[100], 4);
610
611        h.record(&[("org_id", "1")], 50);
612        h.record(&[("org_id", "2")], 150);
613
614        assert_eq!(h.count(&[("org_id", "1")]), 1);
615        assert_eq!(h.count(&[("org_id", "2")]), 1);
616
617        let snap = h.snapshot();
618        assert_eq!(snap.len(), 2);
619    }
620
621    #[test]
622    fn test_overflow_bucket_routes_new_series_at_capacity() {
623        let h = DynamicHistogram::with_limits(&[100], 4, 1);
624
625        h.record(&[("org_id", "1")], 50);
626        h.record(&[("org_id", "2")], 150);
627
628        assert_eq!(h.cardinality(), 2);
629        assert_eq!(h.count(&[(OVERFLOW_LABEL_KEY, OVERFLOW_LABEL_VALUE)]), 1);
630        assert_eq!(h.sum(&[(OVERFLOW_LABEL_KEY, OVERFLOW_LABEL_VALUE)]), 150);
631    }
632
633    #[test]
634    fn test_concurrent_cap_bounded_overshoot() {
635        use std::sync::{Arc, Barrier};
636        use std::thread;
637
638        let cap = 10;
639        let threads = 16;
640        let h = Arc::new(DynamicHistogram::with_limits(&[100, 1000], 4, cap));
641        let barrier = Arc::new(Barrier::new(threads));
642
643        let handles: Vec<_> = (0..threads)
644            .map(|t| {
645                let h = Arc::clone(&h);
646                let barrier = Arc::clone(&barrier);
647                thread::spawn(move || {
648                    barrier.wait();
649                    for i in 0..5 {
650                        let label = format!("t{t}_s{i}");
651                        h.record(&[("key", &label)], 42);
652                    }
653                })
654            })
655            .collect();
656
657        for handle in handles {
658            handle.join().unwrap();
659        }
660
661        let card = h.cardinality();
662        assert!(
663            card <= cap + threads + 1,
664            "cardinality {card} exceeded bounded overshoot (cap={cap}, threads={threads})"
665        );
666        assert!(h.overflow_count() > 0, "overflow should have triggered");
667    }
668}