1use super::cache::{CacheableSeries, LabelCache, SERIES_CACHE_SIZE};
4#[cfg(feature = "eviction")]
5use super::current_cycle;
6use super::{DynamicLabelSet, HISTOGRAM_IDS, thread_id};
7use crossbeam_utils::CachePadded;
8use parking_lot::RwLock;
9use std::cell::RefCell;
10use std::collections::HashMap;
11use std::hash::{Hash, Hasher};
12#[cfg(feature = "eviction")]
13use std::sync::atomic::AtomicU32;
14use std::sync::atomic::{AtomicBool, AtomicIsize, AtomicU64, AtomicUsize, Ordering};
15use std::sync::{Arc, Weak};
16
17const DEFAULT_MAX_SERIES: usize = 2000;
18const OVERFLOW_LABEL_KEY: &str = "__ft_overflow";
19const OVERFLOW_LABEL_VALUE: &str = "true";
20
21type HistogramIndexShard = CachePadded<RwLock<HashMap<DynamicLabelSet, Arc<HistogramSeries>>>>;
22type HistogramSnapshotEntry = (DynamicLabelSet, Vec<(u64, u64)>, u64, u64);
23
24struct ShardedCounter {
25 cells: Vec<CachePadded<AtomicIsize>>,
26}
27
28impl ShardedCounter {
29 fn new(shard_count: usize) -> Self {
30 Self {
31 cells: (0..shard_count)
32 .map(|_| CachePadded::new(AtomicIsize::new(0)))
33 .collect(),
34 }
35 }
36
37 #[inline]
38 fn add_at(&self, shard_idx: usize, value: isize) {
39 self.cells[shard_idx].fetch_add(value, Ordering::Relaxed);
40 }
41
42 #[inline]
43 fn inc_at(&self, shard_idx: usize) {
44 self.add_at(shard_idx, 1);
45 }
46
47 #[inline]
48 fn sum(&self) -> isize {
49 self.cells
50 .iter()
51 .map(|cell| cell.load(Ordering::Relaxed))
52 .sum()
53 }
54}
55
56struct HistogramSeries {
57 bounds: Arc<Vec<u64>>,
58 buckets: Vec<ShardedCounter>,
59 sum: ShardedCounter,
60 count: ShardedCounter,
61 evicted: AtomicBool,
63 #[cfg(feature = "eviction")]
65 last_accessed_cycle: AtomicU32,
66}
67
68impl HistogramSeries {
69 #[cfg(feature = "eviction")]
70 fn new(bounds: Arc<Vec<u64>>, shard_count: usize, cycle: u32) -> Self {
71 let buckets = (0..=bounds.len())
72 .map(|_| ShardedCounter::new(shard_count))
73 .collect();
74 Self {
75 bounds,
76 buckets,
77 sum: ShardedCounter::new(shard_count),
78 count: ShardedCounter::new(shard_count),
79 evicted: AtomicBool::new(false),
80 last_accessed_cycle: AtomicU32::new(cycle),
81 }
82 }
83
84 #[cfg(not(feature = "eviction"))]
85 fn new(bounds: Arc<Vec<u64>>, shard_count: usize) -> Self {
86 let buckets = (0..=bounds.len())
87 .map(|_| ShardedCounter::new(shard_count))
88 .collect();
89 Self {
90 bounds,
91 buckets,
92 sum: ShardedCounter::new(shard_count),
93 count: ShardedCounter::new(shard_count),
94 evicted: AtomicBool::new(false),
95 }
96 }
97
98 #[inline]
99 fn is_evicted(&self) -> bool {
100 self.evicted.load(Ordering::Relaxed)
101 }
102
103 #[cfg(feature = "eviction")]
104 fn mark_evicted(&self) {
105 self.evicted.store(true, Ordering::Relaxed);
106 }
107
108 #[inline]
109 fn record_at(&self, shard_idx: usize, value: u64) {
110 let bucket_idx = self
111 .bounds
112 .iter()
113 .position(|&bound| value <= bound)
114 .unwrap_or(self.bounds.len());
115 self.buckets[bucket_idx].inc_at(shard_idx);
116 self.sum.add_at(shard_idx, value as isize);
117 self.count.inc_at(shard_idx);
118 }
121
122 #[cfg(feature = "eviction")]
124 #[inline]
125 fn touch(&self, cycle: u32) {
126 self.last_accessed_cycle.store(cycle, Ordering::Relaxed);
127 }
128
129 fn buckets_cumulative(&self) -> Vec<(u64, u64)> {
130 let mut result = Vec::with_capacity(self.buckets.len());
131 for (bound, cumulative) in self.buckets_cumulative_iter() {
132 result.push((bound, cumulative));
133 }
134 result
135 }
136
137 fn buckets_cumulative_iter(&self) -> impl Iterator<Item = (u64, u64)> + '_ {
138 let mut cumulative = 0i64;
139 self.buckets.iter().enumerate().map(move |(i, counter)| {
140 cumulative += counter.sum() as i64;
141 let bound = if i < self.bounds.len() {
142 self.bounds[i]
143 } else {
144 u64::MAX
145 };
146 (bound, cumulative as u64)
147 })
148 }
149
150 fn sum(&self) -> u64 {
151 self.sum.sum() as u64
152 }
153
154 fn count(&self) -> u64 {
155 self.count.sum() as u64
156 }
157}
158
159impl CacheableSeries for HistogramSeries {
160 fn is_evicted(&self) -> bool {
161 self.is_evicted()
162 }
163}
164
165#[derive(Clone)]
171pub struct DynamicHistogramSeries {
172 series: Arc<HistogramSeries>,
173 shard_mask: usize,
174}
175
176#[doc(hidden)]
178pub struct DynamicHistogramSeriesView<'a> {
179 series: &'a HistogramSeries,
180}
181
182impl<'a> DynamicHistogramSeriesView<'a> {
183 #[doc(hidden)]
185 pub fn buckets_cumulative_iter(&self) -> impl Iterator<Item = (u64, u64)> + '_ {
186 self.series.buckets_cumulative_iter()
187 }
188
189 #[doc(hidden)]
190 pub fn sum(&self) -> u64 {
191 self.series.sum()
192 }
193
194 #[doc(hidden)]
195 pub fn count(&self) -> u64 {
196 self.series.count()
197 }
198}
199
200impl DynamicHistogramSeries {
201 #[inline]
203 pub fn record(&self, value: u64) {
204 let shard_idx = thread_id() & self.shard_mask;
205 self.series.record_at(shard_idx, value);
206 }
207
208 pub fn buckets_cumulative(&self) -> Vec<(u64, u64)> {
210 self.series.buckets_cumulative()
211 }
212
213 pub fn sum(&self) -> u64 {
215 self.series.sum()
216 }
217
218 pub fn count(&self) -> u64 {
220 self.series.count()
221 }
222
223 #[inline]
225 pub fn is_evicted(&self) -> bool {
226 self.series.is_evicted()
227 }
228}
229
230thread_local! {
231 static SERIES_CACHE: RefCell<LabelCache<Weak<HistogramSeries>, SERIES_CACHE_SIZE>> =
232 RefCell::new(LabelCache::new());
233}
234
235pub struct DynamicHistogram {
240 id: usize,
241 bounds: Arc<Vec<u64>>,
242 shard_count: usize,
243 max_series: usize,
244 shard_mask: usize,
245 index_shards: Vec<HistogramIndexShard>,
246 series_count: AtomicUsize,
248 overflow_count: AtomicU64,
250}
251
252impl DynamicHistogram {
253 pub fn new(bounds: &[u64], shard_count: usize) -> Self {
255 Self::with_limits(bounds, shard_count, DEFAULT_MAX_SERIES)
256 }
257
258 pub fn with_limits(bounds: &[u64], shard_count: usize, max_series: usize) -> Self {
266 let shard_count = shard_count.next_power_of_two();
267 let id = HISTOGRAM_IDS.fetch_add(1, Ordering::Relaxed);
268 Self {
269 id,
270 bounds: Arc::new(bounds.to_vec()),
271 shard_count,
272 max_series,
273 shard_mask: shard_count - 1,
274 index_shards: (0..shard_count)
275 .map(|_| CachePadded::new(RwLock::new(HashMap::new())))
276 .collect(),
277 series_count: AtomicUsize::new(0),
278 overflow_count: AtomicU64::new(0),
279 }
280 }
281
282 pub fn with_latency_buckets(shard_count: usize) -> Self {
284 Self::with_limits(
285 &[
286 10, 50, 100, 500, 1_000, 5_000, 10_000, 50_000, 100_000, 500_000, 1_000_000, 5_000_000, 10_000_000, ],
300 shard_count,
301 DEFAULT_MAX_SERIES,
302 )
303 }
304
305 pub fn series(&self, labels: &[(&str, &str)]) -> DynamicHistogramSeries {
309 if let Some(series) = self.cached_series(labels) {
310 return DynamicHistogramSeries {
311 series,
312 shard_mask: self.shard_mask,
313 };
314 }
315 let series = self.lookup_or_create(labels);
316 self.update_cache(labels, &series);
317 DynamicHistogramSeries {
318 series,
319 shard_mask: self.shard_mask,
320 }
321 }
322
323 #[inline]
325 pub fn record(&self, labels: &[(&str, &str)], value: u64) {
326 if let Some(series) = self.cached_series(labels) {
327 let shard_idx = thread_id() & self.shard_mask;
328 series.record_at(shard_idx, value);
329 return;
330 }
331
332 let series = self.lookup_or_create(labels);
333 self.update_cache(labels, &series);
334 let shard_idx = thread_id() & self.shard_mask;
335 series.record_at(shard_idx, value);
336 }
337
338 pub fn buckets_cumulative(&self, labels: &[(&str, &str)]) -> Vec<(u64, u64)> {
340 let key = DynamicLabelSet::from_pairs(labels);
341 let index_shard = self.index_shard_for(&key);
342 self.index_shards[index_shard]
343 .read()
344 .get(&key)
345 .map(|series| series.buckets_cumulative())
346 .unwrap_or_default()
347 }
348
349 pub fn sum(&self, labels: &[(&str, &str)]) -> u64 {
351 let key = DynamicLabelSet::from_pairs(labels);
352 let index_shard = self.index_shard_for(&key);
353 self.index_shards[index_shard]
354 .read()
355 .get(&key)
356 .map(|series| series.sum())
357 .unwrap_or(0)
358 }
359
360 pub fn count(&self, labels: &[(&str, &str)]) -> u64 {
362 let key = DynamicLabelSet::from_pairs(labels);
363 let index_shard = self.index_shard_for(&key);
364 self.index_shards[index_shard]
365 .read()
366 .get(&key)
367 .map(|series| series.count())
368 .unwrap_or(0)
369 }
370
371 pub fn snapshot(&self) -> Vec<HistogramSnapshotEntry> {
373 let mut out = Vec::new();
374 for shard in &self.index_shards {
375 let guard = shard.read();
376 for (labels, series) in guard.iter() {
377 out.push((
378 labels.clone(),
379 series.buckets_cumulative(),
380 series.sum(),
381 series.count(),
382 ));
383 }
384 }
385 out
386 }
387
388 pub fn cardinality(&self) -> usize {
390 self.index_shards
391 .iter()
392 .map(|shard| shard.read().len())
393 .sum()
394 }
395
396 pub fn overflow_count(&self) -> u64 {
401 self.overflow_count.load(Ordering::Relaxed)
402 }
403
404 #[doc(hidden)]
411 pub fn visit_series<F>(&self, mut f: F)
412 where
413 F: for<'a> FnMut(&'a [(String, String)], DynamicHistogramSeriesView<'a>),
414 {
415 for shard in &self.index_shards {
416 let guard = shard.read();
417 for (labels, series) in guard.iter() {
418 f(labels.pairs(), DynamicHistogramSeriesView { series });
419 }
420 }
421 }
422
423 #[cfg(feature = "eviction")]
434 pub fn evict_stale(&self, max_staleness: u32) -> usize {
435 let cycle = current_cycle();
436 let mut removed = 0;
437
438 for shard in &self.index_shards {
439 let mut guard = shard.write();
440 guard.retain(|_labels, series| {
441 if Arc::strong_count(series) > 1 {
444 return true;
445 }
446 let last = series.last_accessed_cycle.load(Ordering::Relaxed);
448 let stale = cycle.saturating_sub(last) > max_staleness;
449 if stale {
450 series.mark_evicted();
451 removed += 1;
452 self.series_count.fetch_sub(1, Ordering::Relaxed);
453 }
454 !stale
455 });
456 }
457
458 removed
459 }
460
461 fn lookup_or_create(&self, labels: &[(&str, &str)]) -> Arc<HistogramSeries> {
462 let requested_key = DynamicLabelSet::from_pairs(labels);
463 let requested_shard = self.index_shard_for(&requested_key);
464 #[cfg(feature = "eviction")]
465 let cycle = current_cycle();
466
467 if let Some(series) = self.index_shards[requested_shard]
469 .read()
470 .get(&requested_key)
471 {
472 #[cfg(feature = "eviction")]
473 series.touch(cycle);
474 return Arc::clone(series);
475 }
476
477 let key = if self.max_series > 0
479 && self.series_count.load(Ordering::Relaxed) >= self.max_series
480 {
481 self.overflow_count.fetch_add(1, Ordering::Relaxed);
482 DynamicLabelSet::from_pairs(&[(OVERFLOW_LABEL_KEY, OVERFLOW_LABEL_VALUE)])
483 } else {
484 requested_key
485 };
486 let shard = self.index_shard_for(&key);
487
488 if let Some(series) = self.index_shards[shard].read().get(&key) {
489 #[cfg(feature = "eviction")]
490 series.touch(cycle);
491 return Arc::clone(series);
492 }
493
494 let mut guard = self.index_shards[shard].write();
495 if let Some(series) = guard.get(&key) {
496 #[cfg(feature = "eviction")]
497 series.touch(cycle);
498 return Arc::clone(series);
499 }
500 #[cfg(feature = "eviction")]
501 let series = Arc::new(HistogramSeries::new(
502 Arc::clone(&self.bounds),
503 self.shard_count,
504 cycle,
505 ));
506 #[cfg(not(feature = "eviction"))]
507 let series = Arc::new(HistogramSeries::new(
508 Arc::clone(&self.bounds),
509 self.shard_count,
510 ));
511 guard.insert(key, Arc::clone(&series));
512 self.series_count.fetch_add(1, Ordering::Relaxed);
513 series
514 }
515
516 fn index_shard_for(&self, key: &DynamicLabelSet) -> usize {
517 let mut hasher = std::collections::hash_map::DefaultHasher::new();
518 key.hash(&mut hasher);
519 (hasher.finish() as usize) & self.shard_mask
520 }
521
522 fn cached_series(&self, labels: &[(&str, &str)]) -> Option<Arc<HistogramSeries>> {
523 SERIES_CACHE.with(|cache| {
524 let series = cache.borrow_mut().get(self.id, labels)?;
525 #[cfg(feature = "eviction")]
526 series.touch(current_cycle());
527 Some(series)
528 })
529 }
530
531 fn update_cache(&self, labels: &[(&str, &str)], series: &Arc<HistogramSeries>) {
532 SERIES_CACHE.with(|cache| {
533 cache
534 .borrow_mut()
535 .insert(self.id, labels, Arc::downgrade(series));
536 });
537 }
538}
539
540#[cfg(test)]
541mod tests {
542 use super::*;
543
544 #[test]
545 fn test_basic_recording() {
546 let h = DynamicHistogram::new(&[10, 100, 1000], 4);
547 let labels = &[("org_id", "42")];
548
549 h.record(labels, 5); h.record(labels, 50); h.record(labels, 500); h.record(labels, 5000); let buckets = h.buckets_cumulative(labels);
555 assert_eq!(buckets.len(), 4);
556 assert_eq!(buckets[0], (10, 1));
557 assert_eq!(buckets[1], (100, 2));
558 assert_eq!(buckets[2], (1000, 3));
559 assert_eq!(buckets[3], (u64::MAX, 4));
560
561 assert_eq!(h.count(labels), 4);
562 assert_eq!(h.sum(labels), 5 + 50 + 500 + 5000);
563 }
564
565 #[test]
566 fn test_label_order_is_canonicalized() {
567 let h = DynamicHistogram::new(&[10, 100], 4);
568
569 h.record(&[("org_id", "42"), ("endpoint", "abc")], 5);
570
571 assert_eq!(h.count(&[("endpoint", "abc"), ("org_id", "42")]), 1);
572 }
573
574 #[test]
575 fn test_series_handle() {
576 let h = DynamicHistogram::new(&[10, 100, 1000], 4);
577 let series = h.series(&[("org_id", "42")]);
578
579 series.record(5);
580 series.record(50);
581 series.record(500);
582
583 assert_eq!(series.count(), 3);
584 assert_eq!(series.sum(), 555);
585 assert_eq!(h.count(&[("org_id", "42")]), 3);
586 }
587
588 #[test]
589 fn test_multiple_label_sets() {
590 let h = DynamicHistogram::new(&[100], 4);
591
592 h.record(&[("org_id", "1")], 50);
593 h.record(&[("org_id", "2")], 150);
594
595 assert_eq!(h.count(&[("org_id", "1")]), 1);
596 assert_eq!(h.count(&[("org_id", "2")]), 1);
597
598 let snap = h.snapshot();
599 assert_eq!(snap.len(), 2);
600 }
601
602 #[test]
603 fn test_overflow_bucket_routes_new_series_at_capacity() {
604 let h = DynamicHistogram::with_limits(&[100], 4, 1);
605
606 h.record(&[("org_id", "1")], 50);
607 h.record(&[("org_id", "2")], 150);
608
609 assert_eq!(h.cardinality(), 2);
610 assert_eq!(h.count(&[(OVERFLOW_LABEL_KEY, OVERFLOW_LABEL_VALUE)]), 1);
611 assert_eq!(h.sum(&[(OVERFLOW_LABEL_KEY, OVERFLOW_LABEL_VALUE)]), 150);
612 }
613
614 #[test]
615 fn test_concurrent_cap_bounded_overshoot() {
616 use std::sync::{Arc, Barrier};
617 use std::thread;
618
619 let cap = 10;
620 let threads = 16;
621 let h = Arc::new(DynamicHistogram::with_limits(&[100, 1000], 4, cap));
622 let barrier = Arc::new(Barrier::new(threads));
623
624 let handles: Vec<_> = (0..threads)
625 .map(|t| {
626 let h = Arc::clone(&h);
627 let barrier = Arc::clone(&barrier);
628 thread::spawn(move || {
629 barrier.wait();
630 for i in 0..5 {
631 let label = format!("t{t}_s{i}");
632 h.record(&[("key", &label)], 42);
633 }
634 })
635 })
636 .collect();
637
638 for handle in handles {
639 handle.join().unwrap();
640 }
641
642 let card = h.cardinality();
643 assert!(
644 card <= cap + threads + 1,
645 "cardinality {card} exceeded bounded overshoot (cap={cap}, threads={threads})"
646 );
647 assert!(h.overflow_count() > 0, "overflow should have triggered");
648 }
649}