1use super::cache::{CacheableSeries, LabelCache, SERIES_CACHE_SIZE};
4#[cfg(feature = "eviction")]
5use super::current_cycle;
6use super::{DynamicLabelSet, HISTOGRAM_IDS, thread_id};
7use crossbeam_utils::CachePadded;
8use parking_lot::RwLock;
9use std::cell::RefCell;
10use std::collections::HashMap;
11use std::hash::{Hash, Hasher};
12#[cfg(feature = "eviction")]
13use std::sync::atomic::AtomicU32;
14use std::sync::atomic::{AtomicBool, AtomicIsize, AtomicU64, AtomicUsize, Ordering};
15use std::sync::{Arc, Weak};
16
17const DEFAULT_MAX_SERIES: usize = 2000;
18const OVERFLOW_LABEL_KEY: &str = "__ft_overflow";
19const OVERFLOW_LABEL_VALUE: &str = "true";
20
21type HistogramIndexShard = CachePadded<RwLock<HashMap<DynamicLabelSet, Arc<HistogramSeries>>>>;
22type HistogramSnapshotEntry = (DynamicLabelSet, Vec<(u64, u64)>, u64, u64);
23
24struct ShardedCounter {
25 cells: Vec<CachePadded<AtomicIsize>>,
26}
27
28impl ShardedCounter {
29 fn new(shard_count: usize) -> Self {
30 Self {
31 cells: (0..shard_count)
32 .map(|_| CachePadded::new(AtomicIsize::new(0)))
33 .collect(),
34 }
35 }
36
37 #[inline]
38 fn add_at(&self, shard_idx: usize, value: isize) {
39 self.cells[shard_idx].fetch_add(value, Ordering::Relaxed);
40 }
41
42 #[inline]
43 fn inc_at(&self, shard_idx: usize) {
44 self.add_at(shard_idx, 1);
45 }
46
47 #[inline]
48 fn sum(&self) -> isize {
49 self.cells
50 .iter()
51 .map(|cell| cell.load(Ordering::Relaxed))
52 .sum()
53 }
54}
55
56struct HistogramSeries {
57 bounds: Arc<Vec<u64>>,
58 buckets: Vec<ShardedCounter>,
59 sum: ShardedCounter,
60 count: ShardedCounter,
61 evicted: AtomicBool,
63 #[cfg(feature = "eviction")]
65 last_accessed_cycle: AtomicU32,
66}
67
68impl HistogramSeries {
69 #[cfg(feature = "eviction")]
70 fn new(bounds: Arc<Vec<u64>>, shard_count: usize, cycle: u32) -> Self {
71 let buckets = (0..=bounds.len())
72 .map(|_| ShardedCounter::new(shard_count))
73 .collect();
74 Self {
75 bounds,
76 buckets,
77 sum: ShardedCounter::new(shard_count),
78 count: ShardedCounter::new(shard_count),
79 evicted: AtomicBool::new(false),
80 last_accessed_cycle: AtomicU32::new(cycle),
81 }
82 }
83
84 #[cfg(not(feature = "eviction"))]
85 fn new(bounds: Arc<Vec<u64>>, shard_count: usize) -> Self {
86 let buckets = (0..=bounds.len())
87 .map(|_| ShardedCounter::new(shard_count))
88 .collect();
89 Self {
90 bounds,
91 buckets,
92 sum: ShardedCounter::new(shard_count),
93 count: ShardedCounter::new(shard_count),
94 evicted: AtomicBool::new(false),
95 }
96 }
97
98 #[inline]
99 fn is_evicted(&self) -> bool {
100 self.evicted.load(Ordering::Relaxed)
101 }
102
103 #[cfg(feature = "eviction")]
104 fn mark_evicted(&self) {
105 self.evicted.store(true, Ordering::Relaxed);
106 }
107
108 #[inline]
109 fn record_at(&self, shard_idx: usize, value: u64) {
110 let bucket_idx = self
111 .bounds
112 .iter()
113 .position(|&bound| value <= bound)
114 .unwrap_or(self.bounds.len());
115 self.buckets[bucket_idx].inc_at(shard_idx);
116 self.sum.add_at(shard_idx, value as isize);
117 self.count.inc_at(shard_idx);
118 }
121
122 #[cfg(feature = "eviction")]
124 #[inline]
125 fn touch(&self, cycle: u32) {
126 self.last_accessed_cycle.store(cycle, Ordering::Relaxed);
127 }
128
129 fn buckets_cumulative(&self) -> Vec<(u64, u64)> {
130 let mut result = Vec::with_capacity(self.buckets.len());
131 for (bound, cumulative) in self.buckets_cumulative_iter() {
132 result.push((bound, cumulative));
133 }
134 result
135 }
136
137 fn buckets_cumulative_iter(&self) -> impl Iterator<Item = (u64, u64)> + '_ {
138 let mut cumulative = 0i64;
139 self.buckets.iter().enumerate().map(move |(i, counter)| {
140 cumulative += counter.sum() as i64;
141 let bound = if i < self.bounds.len() {
142 self.bounds[i]
143 } else {
144 u64::MAX
145 };
146 (bound, cumulative as u64)
147 })
148 }
149
150 fn sum(&self) -> u64 {
151 self.sum.sum() as u64
152 }
153
154 fn count(&self) -> u64 {
155 self.count.sum() as u64
156 }
157}
158
159impl CacheableSeries for HistogramSeries {
160 fn is_evicted(&self) -> bool {
161 self.is_evicted()
162 }
163}
164
165#[derive(Clone)]
171pub struct DynamicHistogramSeries {
172 series: Arc<HistogramSeries>,
173 shard_mask: usize,
174}
175
176#[doc(hidden)]
178pub struct DynamicHistogramSeriesView<'a> {
179 series: &'a HistogramSeries,
180}
181
182impl<'a> DynamicHistogramSeriesView<'a> {
183 #[doc(hidden)]
185 pub fn buckets_cumulative_iter(&self) -> impl Iterator<Item = (u64, u64)> + '_ {
186 self.series.buckets_cumulative_iter()
187 }
188
189 #[doc(hidden)]
190 pub fn sum(&self) -> u64 {
191 self.series.sum()
192 }
193
194 #[doc(hidden)]
195 pub fn count(&self) -> u64 {
196 self.series.count()
197 }
198}
199
200impl DynamicHistogramSeries {
201 #[inline]
203 pub fn record(&self, value: u64) {
204 let shard_idx = thread_id() & self.shard_mask;
205 self.series.record_at(shard_idx, value);
206 }
207
208 pub fn buckets_cumulative(&self) -> Vec<(u64, u64)> {
210 self.series.buckets_cumulative()
211 }
212
213 pub fn sum(&self) -> u64 {
215 self.series.sum()
216 }
217
218 pub fn count(&self) -> u64 {
220 self.series.count()
221 }
222
223 #[inline]
225 pub fn is_evicted(&self) -> bool {
226 self.series.is_evicted()
227 }
228}
229
230thread_local! {
231 static SERIES_CACHE: RefCell<LabelCache<Weak<HistogramSeries>, SERIES_CACHE_SIZE>> =
232 RefCell::new(LabelCache::new());
233}
234
235pub struct DynamicHistogram {
240 id: usize,
241 bounds: Arc<Vec<u64>>,
242 shard_count: usize,
243 max_series: usize,
244 shard_mask: usize,
245 index_shards: Vec<HistogramIndexShard>,
246 series_count: AtomicUsize,
248 overflow_count: AtomicU64,
250}
251
252impl DynamicHistogram {
253 pub fn new(bounds: &[u64], shard_count: usize) -> Self {
255 Self::with_limits(bounds, shard_count, DEFAULT_MAX_SERIES)
256 }
257
258 pub fn with_limits(bounds: &[u64], shard_count: usize, max_series: usize) -> Self {
266 let shard_count = shard_count.next_power_of_two();
267 let id = HISTOGRAM_IDS.fetch_add(1, Ordering::Relaxed);
268 Self {
269 id,
270 bounds: Arc::new(bounds.to_vec()),
271 shard_count,
272 max_series,
273 shard_mask: shard_count - 1,
274 index_shards: (0..shard_count)
275 .map(|_| CachePadded::new(RwLock::new(HashMap::new())))
276 .collect(),
277 series_count: AtomicUsize::new(0),
278 overflow_count: AtomicU64::new(0),
279 }
280 }
281
282 pub fn with_latency_buckets(shard_count: usize) -> Self {
284 Self::with_limits(
285 &[
286 10, 50, 100, 500, 1_000, 5_000, 10_000, 50_000, 100_000, 500_000, 1_000_000, 5_000_000, 10_000_000, ],
300 shard_count,
301 DEFAULT_MAX_SERIES,
302 )
303 }
304
305 pub fn series(&self, labels: &[(&str, &str)]) -> DynamicHistogramSeries {
309 if let Some(series) = self.cached_series(labels) {
310 return DynamicHistogramSeries {
311 series,
312 shard_mask: self.shard_mask,
313 };
314 }
315 let series = self.lookup_or_create(labels);
316 self.update_cache(labels, &series);
317 DynamicHistogramSeries {
318 series,
319 shard_mask: self.shard_mask,
320 }
321 }
322
323 #[inline]
325 pub fn record(&self, labels: &[(&str, &str)], value: u64) {
326 if let Some(series) = self.cached_series(labels) {
327 let shard_idx = thread_id() & self.shard_mask;
328 series.record_at(shard_idx, value);
329 return;
330 }
331
332 let series = self.lookup_or_create(labels);
333 self.update_cache(labels, &series);
334 let shard_idx = thread_id() & self.shard_mask;
335 series.record_at(shard_idx, value);
336 }
337
338 pub fn buckets_cumulative(&self, labels: &[(&str, &str)]) -> Vec<(u64, u64)> {
340 let key = DynamicLabelSet::from_pairs(labels);
341 let index_shard = self.index_shard_for(&key);
342 self.index_shards[index_shard]
343 .read()
344 .get(&key)
345 .map(|series| series.buckets_cumulative())
346 .unwrap_or_default()
347 }
348
349 pub fn sum(&self, labels: &[(&str, &str)]) -> u64 {
351 let key = DynamicLabelSet::from_pairs(labels);
352 let index_shard = self.index_shard_for(&key);
353 self.index_shards[index_shard]
354 .read()
355 .get(&key)
356 .map(|series| series.sum())
357 .unwrap_or(0)
358 }
359
360 pub fn count(&self, labels: &[(&str, &str)]) -> u64 {
362 let key = DynamicLabelSet::from_pairs(labels);
363 let index_shard = self.index_shard_for(&key);
364 self.index_shards[index_shard]
365 .read()
366 .get(&key)
367 .map(|series| series.count())
368 .unwrap_or(0)
369 }
370
371 pub fn snapshot(&self) -> Vec<HistogramSnapshotEntry> {
373 let mut out = Vec::new();
374 for shard in &self.index_shards {
375 let guard = shard.read();
376 for (labels, series) in guard.iter() {
377 out.push((
378 labels.clone(),
379 series.buckets_cumulative(),
380 series.sum(),
381 series.count(),
382 ));
383 }
384 }
385 out
386 }
387
388 pub fn cardinality(&self) -> usize {
390 self.index_shards
391 .iter()
392 .map(|shard| shard.read().len())
393 .sum()
394 }
395
396 pub fn overflow_count(&self) -> u64 {
401 self.overflow_count.load(Ordering::Relaxed)
402 }
403
404 #[doc(hidden)]
409 pub fn visit_series<F>(&self, mut f: F)
410 where
411 F: for<'a> FnMut(&'a [(String, String)], DynamicHistogramSeriesView<'a>),
412 {
413 for shard in &self.index_shards {
414 let guard = shard.read();
415 for (labels, series) in guard.iter() {
416 f(labels.pairs(), DynamicHistogramSeriesView { series });
417 }
418 }
419 }
420
421 #[cfg(feature = "eviction")]
432 pub fn evict_stale(&self, max_staleness: u32) -> usize {
433 let cycle = current_cycle();
434 let mut removed = 0;
435
436 for shard in &self.index_shards {
437 let mut guard = shard.write();
438 guard.retain(|_labels, series| {
439 if Arc::strong_count(series) > 1 {
442 return true;
443 }
444 let last = series.last_accessed_cycle.load(Ordering::Relaxed);
446 let stale = cycle.saturating_sub(last) > max_staleness;
447 if stale {
448 series.mark_evicted();
449 removed += 1;
450 self.series_count.fetch_sub(1, Ordering::Relaxed);
451 }
452 !stale
453 });
454 }
455
456 removed
457 }
458
459 fn lookup_or_create(&self, labels: &[(&str, &str)]) -> Arc<HistogramSeries> {
460 let requested_key = DynamicLabelSet::from_pairs(labels);
461 let requested_shard = self.index_shard_for(&requested_key);
462 #[cfg(feature = "eviction")]
463 let cycle = current_cycle();
464
465 if let Some(series) = self.index_shards[requested_shard]
467 .read()
468 .get(&requested_key)
469 {
470 #[cfg(feature = "eviction")]
471 series.touch(cycle);
472 return Arc::clone(series);
473 }
474
475 let key = if self.max_series > 0
477 && self.series_count.load(Ordering::Relaxed) >= self.max_series
478 {
479 self.overflow_count.fetch_add(1, Ordering::Relaxed);
480 DynamicLabelSet::from_pairs(&[(OVERFLOW_LABEL_KEY, OVERFLOW_LABEL_VALUE)])
481 } else {
482 requested_key
483 };
484 let shard = self.index_shard_for(&key);
485
486 if let Some(series) = self.index_shards[shard].read().get(&key) {
487 #[cfg(feature = "eviction")]
488 series.touch(cycle);
489 return Arc::clone(series);
490 }
491
492 let mut guard = self.index_shards[shard].write();
493 if let Some(series) = guard.get(&key) {
494 #[cfg(feature = "eviction")]
495 series.touch(cycle);
496 return Arc::clone(series);
497 }
498 #[cfg(feature = "eviction")]
499 let series = Arc::new(HistogramSeries::new(
500 Arc::clone(&self.bounds),
501 self.shard_count,
502 cycle,
503 ));
504 #[cfg(not(feature = "eviction"))]
505 let series = Arc::new(HistogramSeries::new(
506 Arc::clone(&self.bounds),
507 self.shard_count,
508 ));
509 guard.insert(key, Arc::clone(&series));
510 self.series_count.fetch_add(1, Ordering::Relaxed);
511 series
512 }
513
514 fn index_shard_for(&self, key: &DynamicLabelSet) -> usize {
515 let mut hasher = std::collections::hash_map::DefaultHasher::new();
516 key.hash(&mut hasher);
517 (hasher.finish() as usize) & self.shard_mask
518 }
519
520 fn cached_series(&self, labels: &[(&str, &str)]) -> Option<Arc<HistogramSeries>> {
521 SERIES_CACHE.with(|cache| {
522 let series = cache.borrow_mut().get(self.id, labels)?;
523 #[cfg(feature = "eviction")]
524 series.touch(current_cycle());
525 Some(series)
526 })
527 }
528
529 fn update_cache(&self, labels: &[(&str, &str)], series: &Arc<HistogramSeries>) {
530 SERIES_CACHE.with(|cache| {
531 cache
532 .borrow_mut()
533 .insert(self.id, labels, Arc::downgrade(series));
534 });
535 }
536}
537
538#[cfg(test)]
539mod tests {
540 use super::*;
541
542 #[test]
543 fn test_basic_recording() {
544 let h = DynamicHistogram::new(&[10, 100, 1000], 4);
545 let labels = &[("org_id", "42")];
546
547 h.record(labels, 5); h.record(labels, 50); h.record(labels, 500); h.record(labels, 5000); let buckets = h.buckets_cumulative(labels);
553 assert_eq!(buckets.len(), 4);
554 assert_eq!(buckets[0], (10, 1));
555 assert_eq!(buckets[1], (100, 2));
556 assert_eq!(buckets[2], (1000, 3));
557 assert_eq!(buckets[3], (u64::MAX, 4));
558
559 assert_eq!(h.count(labels), 4);
560 assert_eq!(h.sum(labels), 5 + 50 + 500 + 5000);
561 }
562
563 #[test]
564 fn test_label_order_is_canonicalized() {
565 let h = DynamicHistogram::new(&[10, 100], 4);
566
567 h.record(&[("org_id", "42"), ("endpoint", "abc")], 5);
568
569 assert_eq!(h.count(&[("endpoint", "abc"), ("org_id", "42")]), 1);
570 }
571
572 #[test]
573 fn test_series_handle() {
574 let h = DynamicHistogram::new(&[10, 100, 1000], 4);
575 let series = h.series(&[("org_id", "42")]);
576
577 series.record(5);
578 series.record(50);
579 series.record(500);
580
581 assert_eq!(series.count(), 3);
582 assert_eq!(series.sum(), 555);
583 assert_eq!(h.count(&[("org_id", "42")]), 3);
584 }
585
586 #[test]
587 fn test_multiple_label_sets() {
588 let h = DynamicHistogram::new(&[100], 4);
589
590 h.record(&[("org_id", "1")], 50);
591 h.record(&[("org_id", "2")], 150);
592
593 assert_eq!(h.count(&[("org_id", "1")]), 1);
594 assert_eq!(h.count(&[("org_id", "2")]), 1);
595
596 let snap = h.snapshot();
597 assert_eq!(snap.len(), 2);
598 }
599
600 #[test]
601 fn test_overflow_bucket_routes_new_series_at_capacity() {
602 let h = DynamicHistogram::with_limits(&[100], 4, 1);
603
604 h.record(&[("org_id", "1")], 50);
605 h.record(&[("org_id", "2")], 150);
606
607 assert_eq!(h.cardinality(), 2);
608 assert_eq!(h.count(&[(OVERFLOW_LABEL_KEY, OVERFLOW_LABEL_VALUE)]), 1);
609 assert_eq!(h.sum(&[(OVERFLOW_LABEL_KEY, OVERFLOW_LABEL_VALUE)]), 150);
610 }
611
612 #[test]
613 fn test_concurrent_cap_bounded_overshoot() {
614 use std::sync::{Arc, Barrier};
615 use std::thread;
616
617 let cap = 10;
618 let threads = 16;
619 let h = Arc::new(DynamicHistogram::with_limits(&[100, 1000], 4, cap));
620 let barrier = Arc::new(Barrier::new(threads));
621
622 let handles: Vec<_> = (0..threads)
623 .map(|t| {
624 let h = Arc::clone(&h);
625 let barrier = Arc::clone(&barrier);
626 thread::spawn(move || {
627 barrier.wait();
628 for i in 0..5 {
629 let label = format!("t{t}_s{i}");
630 h.record(&[("key", &label)], 42);
631 }
632 })
633 })
634 .collect();
635
636 for handle in handles {
637 handle.join().unwrap();
638 }
639
640 let card = h.cardinality();
641 assert!(
642 card <= cap + threads + 1,
643 "cardinality {card} exceeded bounded overshoot (cap={cap}, threads={threads})"
644 );
645 assert!(h.overflow_count() > 0, "overflow should have triggered");
646 }
647}