1#[cfg(feature = "eviction")]
4use super::current_cycle;
5use super::{DynamicLabelSet, HISTOGRAM_IDS, thread_id};
6use crossbeam_utils::CachePadded;
7use parking_lot::RwLock;
8use std::cell::RefCell;
9use std::collections::HashMap;
10use std::hash::{Hash, Hasher};
11#[cfg(feature = "eviction")]
12use std::sync::atomic::AtomicU32;
13use std::sync::atomic::{AtomicBool, AtomicIsize, AtomicU64, AtomicUsize, Ordering};
14use std::sync::{Arc, Weak};
15
16const DEFAULT_MAX_SERIES: usize = 2000;
17const OVERFLOW_LABEL_KEY: &str = "__ft_overflow";
18const OVERFLOW_LABEL_VALUE: &str = "true";
19
20type HistogramIndexShard = CachePadded<RwLock<HashMap<DynamicLabelSet, Arc<HistogramSeries>>>>;
21type HistogramSnapshotEntry = (DynamicLabelSet, Vec<(u64, u64)>, u64, u64);
22
23struct ShardedCounter {
24 cells: Vec<CachePadded<AtomicIsize>>,
25}
26
27impl ShardedCounter {
28 fn new(shard_count: usize) -> Self {
29 Self {
30 cells: (0..shard_count)
31 .map(|_| CachePadded::new(AtomicIsize::new(0)))
32 .collect(),
33 }
34 }
35
36 #[inline]
37 fn add_at(&self, shard_idx: usize, value: isize) {
38 self.cells[shard_idx].fetch_add(value, Ordering::Relaxed);
39 }
40
41 #[inline]
42 fn inc_at(&self, shard_idx: usize) {
43 self.add_at(shard_idx, 1);
44 }
45
46 #[inline]
47 fn sum(&self) -> isize {
48 self.cells
49 .iter()
50 .map(|cell| cell.load(Ordering::Relaxed))
51 .sum()
52 }
53}
54
55struct HistogramSeries {
56 bounds: Arc<Vec<u64>>,
57 buckets: Vec<ShardedCounter>,
58 sum: ShardedCounter,
59 count: ShardedCounter,
60 evicted: AtomicBool,
62 #[cfg(feature = "eviction")]
64 last_accessed_cycle: AtomicU32,
65}
66
67impl HistogramSeries {
68 #[cfg(feature = "eviction")]
69 fn new(bounds: Arc<Vec<u64>>, shard_count: usize, cycle: u32) -> Self {
70 let buckets = (0..=bounds.len())
71 .map(|_| ShardedCounter::new(shard_count))
72 .collect();
73 Self {
74 bounds,
75 buckets,
76 sum: ShardedCounter::new(shard_count),
77 count: ShardedCounter::new(shard_count),
78 evicted: AtomicBool::new(false),
79 last_accessed_cycle: AtomicU32::new(cycle),
80 }
81 }
82
83 #[cfg(not(feature = "eviction"))]
84 fn new(bounds: Arc<Vec<u64>>, shard_count: usize) -> Self {
85 let buckets = (0..=bounds.len())
86 .map(|_| ShardedCounter::new(shard_count))
87 .collect();
88 Self {
89 bounds,
90 buckets,
91 sum: ShardedCounter::new(shard_count),
92 count: ShardedCounter::new(shard_count),
93 evicted: AtomicBool::new(false),
94 }
95 }
96
97 #[inline]
98 fn is_evicted(&self) -> bool {
99 self.evicted.load(Ordering::Relaxed)
100 }
101
102 #[cfg(feature = "eviction")]
103 fn mark_evicted(&self) {
104 self.evicted.store(true, Ordering::Relaxed);
105 }
106
107 #[inline]
108 fn record_at(&self, shard_idx: usize, value: u64) {
109 let bucket_idx = self
110 .bounds
111 .iter()
112 .position(|&bound| value <= bound)
113 .unwrap_or(self.bounds.len());
114 self.buckets[bucket_idx].inc_at(shard_idx);
115 self.sum.add_at(shard_idx, value as isize);
116 self.count.inc_at(shard_idx);
117 }
120
121 #[cfg(feature = "eviction")]
123 #[inline]
124 fn touch(&self, cycle: u32) {
125 self.last_accessed_cycle.store(cycle, Ordering::Relaxed);
126 }
127
128 fn buckets_cumulative(&self) -> Vec<(u64, u64)> {
129 let mut result = Vec::with_capacity(self.buckets.len());
130 for (bound, cumulative) in self.buckets_cumulative_iter() {
131 result.push((bound, cumulative));
132 }
133 result
134 }
135
136 fn buckets_cumulative_iter(&self) -> impl Iterator<Item = (u64, u64)> + '_ {
137 let mut cumulative = 0i64;
138 self.buckets.iter().enumerate().map(move |(i, counter)| {
139 cumulative += counter.sum() as i64;
140 let bound = if i < self.bounds.len() {
141 self.bounds[i]
142 } else {
143 u64::MAX
144 };
145 (bound, cumulative as u64)
146 })
147 }
148
149 fn sum(&self) -> u64 {
150 self.sum.sum() as u64
151 }
152
153 fn count(&self) -> u64 {
154 self.count.sum() as u64
155 }
156}
157
158#[derive(Clone)]
164pub struct DynamicHistogramSeries {
165 series: Arc<HistogramSeries>,
166 shard_mask: usize,
167}
168
169#[doc(hidden)]
171pub struct DynamicHistogramSeriesView<'a> {
172 series: &'a HistogramSeries,
173}
174
175impl<'a> DynamicHistogramSeriesView<'a> {
176 #[doc(hidden)]
178 pub fn buckets_cumulative_iter(&self) -> impl Iterator<Item = (u64, u64)> + '_ {
179 self.series.buckets_cumulative_iter()
180 }
181
182 #[doc(hidden)]
183 pub fn sum(&self) -> u64 {
184 self.series.sum()
185 }
186
187 #[doc(hidden)]
188 pub fn count(&self) -> u64 {
189 self.series.count()
190 }
191}
192
193impl DynamicHistogramSeries {
194 #[inline]
196 pub fn record(&self, value: u64) {
197 let shard_idx = thread_id() & self.shard_mask;
198 self.series.record_at(shard_idx, value);
199 }
200
201 pub fn buckets_cumulative(&self) -> Vec<(u64, u64)> {
203 self.series.buckets_cumulative()
204 }
205
206 pub fn sum(&self) -> u64 {
208 self.series.sum()
209 }
210
211 pub fn count(&self) -> u64 {
213 self.series.count()
214 }
215
216 #[inline]
218 pub fn is_evicted(&self) -> bool {
219 self.series.is_evicted()
220 }
221}
222
223struct SeriesCacheEntry {
224 histogram_id: usize,
225 ordered_labels: Vec<(String, String)>,
226 series: Weak<HistogramSeries>,
227}
228
229thread_local! {
230 static SERIES_CACHE: RefCell<Option<SeriesCacheEntry>> = const { RefCell::new(None) };
231}
232
233pub struct DynamicHistogram {
238 id: usize,
239 bounds: Arc<Vec<u64>>,
240 shard_count: usize,
241 max_series: usize,
242 shard_mask: usize,
243 index_shards: Vec<HistogramIndexShard>,
244 series_count: AtomicUsize,
246 overflow_count: AtomicU64,
248}
249
250impl DynamicHistogram {
251 pub fn new(bounds: &[u64], shard_count: usize) -> Self {
253 Self::with_limits(bounds, shard_count, DEFAULT_MAX_SERIES)
254 }
255
256 pub fn with_limits(bounds: &[u64], shard_count: usize, max_series: usize) -> Self {
264 let shard_count = shard_count.next_power_of_two();
265 let id = HISTOGRAM_IDS.fetch_add(1, Ordering::Relaxed);
266 Self {
267 id,
268 bounds: Arc::new(bounds.to_vec()),
269 shard_count,
270 max_series,
271 shard_mask: shard_count - 1,
272 index_shards: (0..shard_count)
273 .map(|_| CachePadded::new(RwLock::new(HashMap::new())))
274 .collect(),
275 series_count: AtomicUsize::new(0),
276 overflow_count: AtomicU64::new(0),
277 }
278 }
279
280 pub fn with_latency_buckets(shard_count: usize) -> Self {
282 Self::with_limits(
283 &[
284 10, 50, 100, 500, 1_000, 5_000, 10_000, 50_000, 100_000, 500_000, 1_000_000, 5_000_000, 10_000_000, ],
298 shard_count,
299 DEFAULT_MAX_SERIES,
300 )
301 }
302
303 pub fn series(&self, labels: &[(&str, &str)]) -> DynamicHistogramSeries {
307 if let Some(series) = self.cached_series(labels) {
308 return DynamicHistogramSeries {
309 series,
310 shard_mask: self.shard_mask,
311 };
312 }
313 let series = self.lookup_or_create(labels);
314 self.update_cache(labels, Arc::clone(&series));
315 DynamicHistogramSeries {
316 series,
317 shard_mask: self.shard_mask,
318 }
319 }
320
321 #[inline]
323 pub fn record(&self, labels: &[(&str, &str)], value: u64) {
324 if let Some(series) = self.cached_series(labels) {
325 let shard_idx = thread_id() & self.shard_mask;
326 series.record_at(shard_idx, value);
327 return;
328 }
329
330 let series = self.lookup_or_create(labels);
331 self.update_cache(labels, Arc::clone(&series));
332 let shard_idx = thread_id() & self.shard_mask;
333 series.record_at(shard_idx, value);
334 }
335
336 pub fn buckets_cumulative(&self, labels: &[(&str, &str)]) -> Vec<(u64, u64)> {
338 let key = DynamicLabelSet::from_pairs(labels);
339 let index_shard = self.index_shard_for(&key);
340 self.index_shards[index_shard]
341 .read()
342 .get(&key)
343 .map(|series| series.buckets_cumulative())
344 .unwrap_or_default()
345 }
346
347 pub fn sum(&self, labels: &[(&str, &str)]) -> u64 {
349 let key = DynamicLabelSet::from_pairs(labels);
350 let index_shard = self.index_shard_for(&key);
351 self.index_shards[index_shard]
352 .read()
353 .get(&key)
354 .map(|series| series.sum())
355 .unwrap_or(0)
356 }
357
358 pub fn count(&self, labels: &[(&str, &str)]) -> u64 {
360 let key = DynamicLabelSet::from_pairs(labels);
361 let index_shard = self.index_shard_for(&key);
362 self.index_shards[index_shard]
363 .read()
364 .get(&key)
365 .map(|series| series.count())
366 .unwrap_or(0)
367 }
368
369 pub fn snapshot(&self) -> Vec<HistogramSnapshotEntry> {
371 let mut out = Vec::new();
372 for shard in &self.index_shards {
373 let guard = shard.read();
374 for (labels, series) in guard.iter() {
375 out.push((
376 labels.clone(),
377 series.buckets_cumulative(),
378 series.sum(),
379 series.count(),
380 ));
381 }
382 }
383 out
384 }
385
386 pub fn cardinality(&self) -> usize {
388 self.index_shards
389 .iter()
390 .map(|shard| shard.read().len())
391 .sum()
392 }
393
394 pub fn overflow_count(&self) -> u64 {
399 self.overflow_count.load(Ordering::Relaxed)
400 }
401
402 #[doc(hidden)]
407 pub fn visit_series<F>(&self, mut f: F)
408 where
409 F: for<'a> FnMut(&'a [(String, String)], DynamicHistogramSeriesView<'a>),
410 {
411 for shard in &self.index_shards {
412 let guard = shard.read();
413 for (labels, series) in guard.iter() {
414 f(labels.pairs(), DynamicHistogramSeriesView { series });
415 }
416 }
417 }
418
419 #[cfg(feature = "eviction")]
430 pub fn evict_stale(&self, max_staleness: u32) -> usize {
431 let cycle = current_cycle();
432 let mut removed = 0;
433
434 for shard in &self.index_shards {
435 let mut guard = shard.write();
436 guard.retain(|_labels, series| {
437 if Arc::strong_count(series) > 1 {
440 return true;
441 }
442 let last = series.last_accessed_cycle.load(Ordering::Relaxed);
444 let stale = cycle.saturating_sub(last) > max_staleness;
445 if stale {
446 series.mark_evicted();
447 removed += 1;
448 self.series_count.fetch_sub(1, Ordering::Relaxed);
449 }
450 !stale
451 });
452 }
453
454 removed
455 }
456
457 fn lookup_or_create(&self, labels: &[(&str, &str)]) -> Arc<HistogramSeries> {
458 let requested_key = DynamicLabelSet::from_pairs(labels);
459 let requested_shard = self.index_shard_for(&requested_key);
460 #[cfg(feature = "eviction")]
461 let cycle = current_cycle();
462
463 if let Some(series) = self.index_shards[requested_shard]
465 .read()
466 .get(&requested_key)
467 {
468 #[cfg(feature = "eviction")]
469 series.touch(cycle);
470 return Arc::clone(series);
471 }
472
473 let key = if self.max_series > 0
475 && self.series_count.load(Ordering::Relaxed) >= self.max_series
476 {
477 self.overflow_count.fetch_add(1, Ordering::Relaxed);
478 DynamicLabelSet::from_pairs(&[(OVERFLOW_LABEL_KEY, OVERFLOW_LABEL_VALUE)])
479 } else {
480 requested_key
481 };
482 let shard = self.index_shard_for(&key);
483
484 if let Some(series) = self.index_shards[shard].read().get(&key) {
485 #[cfg(feature = "eviction")]
486 series.touch(cycle);
487 return Arc::clone(series);
488 }
489
490 let mut guard = self.index_shards[shard].write();
491 if let Some(series) = guard.get(&key) {
492 #[cfg(feature = "eviction")]
493 series.touch(cycle);
494 return Arc::clone(series);
495 }
496 #[cfg(feature = "eviction")]
497 let series = Arc::new(HistogramSeries::new(
498 Arc::clone(&self.bounds),
499 self.shard_count,
500 cycle,
501 ));
502 #[cfg(not(feature = "eviction"))]
503 let series = Arc::new(HistogramSeries::new(
504 Arc::clone(&self.bounds),
505 self.shard_count,
506 ));
507 guard.insert(key, Arc::clone(&series));
508 self.series_count.fetch_add(1, Ordering::Relaxed);
509 series
510 }
511
512 fn index_shard_for(&self, key: &DynamicLabelSet) -> usize {
513 let mut hasher = std::collections::hash_map::DefaultHasher::new();
514 key.hash(&mut hasher);
515 (hasher.finish() as usize) & self.shard_mask
516 }
517
518 fn cached_series(&self, labels: &[(&str, &str)]) -> Option<Arc<HistogramSeries>> {
519 SERIES_CACHE.with(|cache| {
520 let cache_ref = cache.borrow();
521 let entry = cache_ref.as_ref()?;
522 if entry.histogram_id != self.id {
523 return None;
524 }
525 if entry.ordered_labels.len() != labels.len() {
526 return None;
527 }
528 for (idx, (k, v)) in labels.iter().enumerate() {
529 let (ek, ev) = &entry.ordered_labels[idx];
530 if ek != k || ev != v {
531 return None;
532 }
533 }
534 let series = entry.series.upgrade()?;
535 if series.is_evicted() {
536 return None;
537 }
538 #[cfg(feature = "eviction")]
539 series.touch(current_cycle());
540 Some(series)
541 })
542 }
543
544 fn update_cache(&self, labels: &[(&str, &str)], series: Arc<HistogramSeries>) {
545 SERIES_CACHE.with(|cache| {
546 let ordered_labels = labels
547 .iter()
548 .map(|(k, v)| ((*k).to_string(), (*v).to_string()))
549 .collect();
550 *cache.borrow_mut() = Some(SeriesCacheEntry {
551 histogram_id: self.id,
552 ordered_labels,
553 series: Arc::downgrade(&series),
554 });
555 });
556 }
557}
558
559#[cfg(test)]
560mod tests {
561 use super::*;
562
563 #[test]
564 fn test_basic_recording() {
565 let h = DynamicHistogram::new(&[10, 100, 1000], 4);
566 let labels = &[("org_id", "42")];
567
568 h.record(labels, 5); h.record(labels, 50); h.record(labels, 500); h.record(labels, 5000); let buckets = h.buckets_cumulative(labels);
574 assert_eq!(buckets.len(), 4);
575 assert_eq!(buckets[0], (10, 1));
576 assert_eq!(buckets[1], (100, 2));
577 assert_eq!(buckets[2], (1000, 3));
578 assert_eq!(buckets[3], (u64::MAX, 4));
579
580 assert_eq!(h.count(labels), 4);
581 assert_eq!(h.sum(labels), 5 + 50 + 500 + 5000);
582 }
583
584 #[test]
585 fn test_label_order_is_canonicalized() {
586 let h = DynamicHistogram::new(&[10, 100], 4);
587
588 h.record(&[("org_id", "42"), ("endpoint", "abc")], 5);
589
590 assert_eq!(h.count(&[("endpoint", "abc"), ("org_id", "42")]), 1);
591 }
592
593 #[test]
594 fn test_series_handle() {
595 let h = DynamicHistogram::new(&[10, 100, 1000], 4);
596 let series = h.series(&[("org_id", "42")]);
597
598 series.record(5);
599 series.record(50);
600 series.record(500);
601
602 assert_eq!(series.count(), 3);
603 assert_eq!(series.sum(), 555);
604 assert_eq!(h.count(&[("org_id", "42")]), 3);
605 }
606
607 #[test]
608 fn test_multiple_label_sets() {
609 let h = DynamicHistogram::new(&[100], 4);
610
611 h.record(&[("org_id", "1")], 50);
612 h.record(&[("org_id", "2")], 150);
613
614 assert_eq!(h.count(&[("org_id", "1")]), 1);
615 assert_eq!(h.count(&[("org_id", "2")]), 1);
616
617 let snap = h.snapshot();
618 assert_eq!(snap.len(), 2);
619 }
620
621 #[test]
622 fn test_overflow_bucket_routes_new_series_at_capacity() {
623 let h = DynamicHistogram::with_limits(&[100], 4, 1);
624
625 h.record(&[("org_id", "1")], 50);
626 h.record(&[("org_id", "2")], 150);
627
628 assert_eq!(h.cardinality(), 2);
629 assert_eq!(h.count(&[(OVERFLOW_LABEL_KEY, OVERFLOW_LABEL_VALUE)]), 1);
630 assert_eq!(h.sum(&[(OVERFLOW_LABEL_KEY, OVERFLOW_LABEL_VALUE)]), 150);
631 }
632
633 #[test]
634 fn test_concurrent_cap_bounded_overshoot() {
635 use std::sync::{Arc, Barrier};
636 use std::thread;
637
638 let cap = 10;
639 let threads = 16;
640 let h = Arc::new(DynamicHistogram::with_limits(&[100, 1000], 4, cap));
641 let barrier = Arc::new(Barrier::new(threads));
642
643 let handles: Vec<_> = (0..threads)
644 .map(|t| {
645 let h = Arc::clone(&h);
646 let barrier = Arc::clone(&barrier);
647 thread::spawn(move || {
648 barrier.wait();
649 for i in 0..5 {
650 let label = format!("t{t}_s{i}");
651 h.record(&[("key", &label)], 42);
652 }
653 })
654 })
655 .collect();
656
657 for handle in handles {
658 handle.join().unwrap();
659 }
660
661 let card = h.cardinality();
662 assert!(
663 card <= cap + threads + 1,
664 "cardinality {card} exceeded bounded overshoot (cap={cap}, threads={threads})"
665 );
666 assert!(h.overflow_count() > 0, "overflow should have triggered");
667 }
668}