1use super::cache::{CacheValue, LabelCache, SERIES_CACHE_SIZE};
8#[cfg(feature = "eviction")]
9use super::current_cycle;
10use super::{DISTRIBUTION_IDS, DynamicLabelSet};
11use crate::exp_buckets::{ExpBuckets, ExpBucketsSnapshot};
12use crossbeam_utils::CachePadded;
13use parking_lot::{Mutex, RwLock};
14use std::cell::RefCell;
15use std::collections::HashMap;
16use std::hash::{Hash, Hasher};
17use std::sync::Arc;
18use std::sync::Weak;
19#[cfg(feature = "eviction")]
20use std::sync::atomic::AtomicU32;
21use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering};
22
23const DEFAULT_MAX_SERIES: usize = 2000;
24const OVERFLOW_LABEL_KEY: &str = "__ft_overflow";
25const OVERFLOW_LABEL_VALUE: &str = "true";
26type DistributionIndexShard =
27 CachePadded<RwLock<HashMap<DynamicLabelSet, Arc<DistributionSeries>>>>;
28type DistributionSnapshotEntry = (DynamicLabelSet, u64, u64, ExpBucketsSnapshot);
29static SERIES_IDS: AtomicUsize = AtomicUsize::new(1);
30
31struct DistributionSeries {
32 id: usize,
33 registry: Mutex<Vec<Arc<ExpBuckets>>>,
34 evicted: AtomicBool,
36 #[cfg(feature = "eviction")]
38 last_accessed_cycle: AtomicU32,
39}
40
41impl DistributionSeries {
42 #[cfg(feature = "eviction")]
43 fn new(cycle: u32) -> Self {
44 Self {
45 id: SERIES_IDS.fetch_add(1, Ordering::Relaxed),
46 registry: Mutex::new(Vec::new()),
47 evicted: AtomicBool::new(false),
48 last_accessed_cycle: AtomicU32::new(cycle),
49 }
50 }
51
52 #[cfg(not(feature = "eviction"))]
53 fn new() -> Self {
54 Self {
55 id: SERIES_IDS.fetch_add(1, Ordering::Relaxed),
56 registry: Mutex::new(Vec::new()),
57 evicted: AtomicBool::new(false),
58 }
59 }
60
61 #[cfg(feature = "eviction")]
63 #[inline]
64 fn touch(&self, cycle: u32) {
65 self.last_accessed_cycle.store(cycle, Ordering::Relaxed);
66 }
67
68 #[inline]
69 fn is_evicted(&self) -> bool {
70 self.evicted.load(Ordering::Relaxed)
71 }
72
73 #[cfg(feature = "eviction")]
74 fn mark_evicted(&self) {
75 self.evicted.store(true, Ordering::Relaxed);
76 }
77
78 fn get_or_create_buf(&self) -> Arc<ExpBuckets> {
79 let buf = Arc::new(ExpBuckets::new());
80 self.registry.lock().push(Arc::clone(&buf));
81 buf
82 }
83
84 fn count(&self) -> u64 {
85 self.registry.lock().iter().map(|buf| buf.get_count()).sum()
86 }
87
88 fn sum(&self) -> u64 {
89 self.registry.lock().iter().map(|buf| buf.get_sum()).sum()
90 }
91
92 fn buckets_snapshot(&self) -> ExpBucketsSnapshot {
93 let mut positive = [0u64; 64];
94 let mut zero_count = 0u64;
95 let mut sum = 0u64;
96 let mut count = 0u64;
97
98 let registry = self.registry.lock();
99 for buf in registry.iter() {
100 let thread_buckets = buf.get_positive_buckets();
101 for (i, &c) in thread_buckets.iter().enumerate() {
102 positive[i] += c;
103 }
104 zero_count += buf.get_zero_count();
105 sum += buf.get_sum();
106 count += buf.get_count();
107 }
108
109 ExpBucketsSnapshot {
110 positive,
111 zero_count,
112 sum,
113 count,
114 }
115 }
116}
117
118struct DistributionCacheValue {
119 series: Weak<DistributionSeries>,
120 buf: Arc<ExpBuckets>,
121}
122
123impl CacheValue for DistributionCacheValue {
124 type Strong = (Arc<DistributionSeries>, Arc<ExpBuckets>);
125
126 fn upgrade(&self) -> Option<Self::Strong> {
127 Some((self.series.upgrade()?, Arc::clone(&self.buf)))
128 }
129
130 fn is_valid(strong: &Self::Strong) -> bool {
131 !strong.0.is_evicted()
132 }
133}
134
135#[derive(Clone)]
141pub struct DynamicDistributionSeries {
142 series: Arc<DistributionSeries>,
143 buf: Arc<ExpBuckets>,
144}
145
146impl DynamicDistributionSeries {
147 #[inline]
149 pub fn record(&self, value: u64) {
150 self.buf.record(value);
151 }
152
153 pub fn count(&self) -> u64 {
155 self.series.count()
156 }
157
158 pub fn sum(&self) -> u64 {
160 self.series.sum()
161 }
162
163 #[inline]
165 pub fn is_evicted(&self) -> bool {
166 self.series.is_evicted()
167 }
168}
169
170thread_local! {
171 static SERIES_CACHE: RefCell<LabelCache<DistributionCacheValue, SERIES_CACHE_SIZE>> =
172 RefCell::new(LabelCache::new());
173 static SERIES_BUF_CACHE: RefCell<Vec<(usize, usize, Weak<ExpBuckets>)>> = const { RefCell::new(Vec::new()) };
174}
175
176pub struct DynamicDistribution {
180 id: usize,
181 max_series: usize,
182 shard_mask: usize,
183 index_shards: Vec<DistributionIndexShard>,
184 series_count: AtomicUsize,
186 overflow_count: AtomicU64,
188}
189
190impl DynamicDistribution {
191 pub fn new(shard_count: usize) -> Self {
193 Self::with_max_series(shard_count, DEFAULT_MAX_SERIES)
194 }
195
196 pub fn with_max_series(shard_count: usize, max_series: usize) -> Self {
198 let shard_count = shard_count.next_power_of_two();
199 let id = DISTRIBUTION_IDS.fetch_add(1, Ordering::Relaxed);
200 Self {
201 id,
202 max_series,
203 shard_mask: shard_count - 1,
204 index_shards: (0..shard_count)
205 .map(|_| CachePadded::new(RwLock::new(HashMap::new())))
206 .collect(),
207 series_count: AtomicUsize::new(0),
208 overflow_count: AtomicU64::new(0),
209 }
210 }
211
212 pub fn series(&self, labels: &[(&str, &str)]) -> DynamicDistributionSeries {
216 if let Some((series, buf)) = self.cached_series(labels) {
217 return DynamicDistributionSeries { series, buf };
218 }
219 let series = self.lookup_or_create(labels);
220 let buf = self.get_or_create_thread_buf(&series);
221 self.update_cache(labels, &series, Arc::clone(&buf));
222 DynamicDistributionSeries { series, buf }
223 }
224
225 #[inline]
227 pub fn record(&self, labels: &[(&str, &str)], value: u64) {
228 if let Some((_series, buf)) = self.cached_series(labels) {
229 buf.record(value);
230 return;
231 }
232
233 let series = self.lookup_or_create(labels);
234 let buf = self.get_or_create_thread_buf(&series);
235 self.update_cache(labels, &series, Arc::clone(&buf));
236 buf.record(value);
237 }
238
239 pub fn count(&self, labels: &[(&str, &str)]) -> u64 {
241 let key = DynamicLabelSet::from_pairs(labels);
242 let index_shard = self.index_shard_for(&key);
243 self.index_shards[index_shard]
244 .read()
245 .get(&key)
246 .map(|series| series.count())
247 .unwrap_or(0)
248 }
249
250 pub fn sum(&self, labels: &[(&str, &str)]) -> u64 {
252 let key = DynamicLabelSet::from_pairs(labels);
253 let index_shard = self.index_shard_for(&key);
254 self.index_shards[index_shard]
255 .read()
256 .get(&key)
257 .map(|series| series.sum())
258 .unwrap_or(0)
259 }
260
261 pub fn snapshot(&self) -> Vec<DistributionSnapshotEntry> {
263 let mut out = Vec::new();
264 for shard in &self.index_shards {
265 let guard = shard.read();
266 for (labels, series) in guard.iter() {
267 let snap = series.buckets_snapshot();
268 out.push((labels.clone(), snap.count, snap.sum, snap));
269 }
270 }
271 out
272 }
273
274 pub fn cardinality(&self) -> usize {
276 self.index_shards
277 .iter()
278 .map(|shard| shard.read().len())
279 .sum()
280 }
281
282 pub fn overflow_count(&self) -> u64 {
287 self.overflow_count.load(Ordering::Relaxed)
288 }
289
290 #[doc(hidden)]
297 pub fn visit_series(
298 &self,
299 mut f: impl FnMut(&[(String, String)], u64, u64, ExpBucketsSnapshot),
300 ) {
301 for shard in &self.index_shards {
302 let guard = shard.read();
303 for (labels, series) in guard.iter() {
304 let snap = series.buckets_snapshot();
305 f(labels.pairs(), snap.count, snap.sum, snap);
306 }
307 }
308 }
309
310 #[cfg(feature = "eviction")]
321 pub fn evict_stale(&self, max_staleness: u32) -> usize {
322 let cycle = current_cycle();
323 let mut removed = 0;
324
325 for shard in &self.index_shards {
326 let mut guard = shard.write();
327 guard.retain(|_labels, series| {
328 if Arc::strong_count(series) > 1 {
331 return true;
332 }
333 let last = series.last_accessed_cycle.load(Ordering::Relaxed);
335 let stale = cycle.saturating_sub(last) > max_staleness;
336 if stale {
337 series.mark_evicted();
338 removed += 1;
339 self.series_count.fetch_sub(1, Ordering::Relaxed);
340 }
341 !stale
342 });
343 }
344
345 removed
346 }
347
348 fn lookup_or_create(&self, labels: &[(&str, &str)]) -> Arc<DistributionSeries> {
349 let requested_key = DynamicLabelSet::from_pairs(labels);
350 let requested_shard = self.index_shard_for(&requested_key);
351 #[cfg(feature = "eviction")]
352 let cycle = current_cycle();
353
354 if let Some(series) = self.index_shards[requested_shard]
356 .read()
357 .get(&requested_key)
358 {
359 #[cfg(feature = "eviction")]
360 series.touch(cycle);
361 return Arc::clone(series);
362 }
363
364 let key = if self.max_series > 0
366 && self.series_count.load(Ordering::Relaxed) >= self.max_series
367 {
368 self.overflow_count.fetch_add(1, Ordering::Relaxed);
369 DynamicLabelSet::from_pairs(&[(OVERFLOW_LABEL_KEY, OVERFLOW_LABEL_VALUE)])
370 } else {
371 requested_key
372 };
373 let shard = self.index_shard_for(&key);
374
375 if let Some(series) = self.index_shards[shard].read().get(&key) {
376 #[cfg(feature = "eviction")]
377 series.touch(cycle);
378 return Arc::clone(series);
379 }
380
381 let mut guard = self.index_shards[shard].write();
382 if let Some(series) = guard.get(&key) {
383 #[cfg(feature = "eviction")]
384 series.touch(cycle);
385 return Arc::clone(series);
386 }
387 #[cfg(feature = "eviction")]
388 let series = Arc::new(DistributionSeries::new(cycle));
389 #[cfg(not(feature = "eviction"))]
390 let series = Arc::new(DistributionSeries::new());
391 guard.insert(key, Arc::clone(&series));
392 self.series_count.fetch_add(1, Ordering::Relaxed);
393 series
394 }
395
396 fn index_shard_for(&self, key: &DynamicLabelSet) -> usize {
397 let mut hasher = std::collections::hash_map::DefaultHasher::new();
398 key.hash(&mut hasher);
399 (hasher.finish() as usize) & self.shard_mask
400 }
401
402 fn cached_series(
403 &self,
404 labels: &[(&str, &str)],
405 ) -> Option<(Arc<DistributionSeries>, Arc<ExpBuckets>)> {
406 SERIES_CACHE.with(|cache| {
407 let (series, buf) = cache.borrow_mut().get(self.id, labels)?;
408 #[cfg(feature = "eviction")]
409 series.touch(current_cycle());
410 Some((series, buf))
411 })
412 }
413
414 fn update_cache(
415 &self,
416 labels: &[(&str, &str)],
417 series: &Arc<DistributionSeries>,
418 buf: Arc<ExpBuckets>,
419 ) {
420 SERIES_CACHE.with(|cache| {
421 cache.borrow_mut().insert(
422 self.id,
423 labels,
424 DistributionCacheValue {
425 series: Arc::downgrade(series),
426 buf,
427 },
428 );
429 });
430 }
431
432 fn get_or_create_thread_buf(&self, series: &Arc<DistributionSeries>) -> Arc<ExpBuckets> {
433 let dist_id = self.id;
434 let series_id = series.id;
435
436 SERIES_BUF_CACHE.with(|cache| {
437 let mut entries = cache.borrow_mut();
438 entries.retain(|(_id, _ptr, weak)| weak.strong_count() > 0);
439
440 for (id, ptr, weak) in entries.iter() {
441 if *id == dist_id
442 && *ptr == series_id
443 && let Some(buf) = weak.upgrade()
444 {
445 return buf;
446 }
447 }
448
449 let buf = series.get_or_create_buf();
450 entries.push((dist_id, series_id, Arc::downgrade(&buf)));
451 buf
452 })
453 }
454}
455
456#[cfg(test)]
457mod tests {
458 use super::*;
459
460 #[test]
461 fn test_basic_recording() {
462 let dist = DynamicDistribution::new(4);
463 let labels = &[("org_id", "42")];
464
465 dist.record(labels, 100);
466 dist.record(labels, 200);
467 dist.record(labels, 300);
468
469 assert_eq!(dist.count(labels), 3);
470 assert_eq!(dist.sum(labels), 600);
471 }
472
473 #[test]
474 fn test_label_order_is_canonicalized() {
475 let dist = DynamicDistribution::new(4);
476
477 dist.record(&[("org_id", "42"), ("endpoint", "abc")], 100);
478
479 assert_eq!(dist.count(&[("endpoint", "abc"), ("org_id", "42")]), 1);
480 }
481
482 #[test]
483 fn test_series_handle() {
484 let dist = DynamicDistribution::new(4);
485 let series = dist.series(&[("org_id", "42")]);
486
487 series.record(100);
488 series.record(200);
489
490 assert_eq!(series.count(), 2);
491 assert_eq!(series.sum(), 300);
492 assert_eq!(dist.count(&[("org_id", "42")]), 2);
493 }
494
495 #[test]
496 fn test_multiple_label_sets() {
497 let dist = DynamicDistribution::new(4);
498
499 dist.record(&[("org_id", "1")], 100);
500 dist.record(&[("org_id", "2")], 200);
501
502 assert_eq!(dist.count(&[("org_id", "1")]), 1);
503 assert_eq!(dist.count(&[("org_id", "2")]), 1);
504
505 let snap = dist.snapshot();
506 assert_eq!(snap.len(), 2);
507 }
508
509 #[test]
510 fn test_overflow_bucket_routes_new_series_at_capacity() {
511 let dist = DynamicDistribution::with_max_series(4, 2);
512
513 dist.record(&[("org_id", "1")], 100);
514 dist.record(&[("org_id", "2")], 200);
515 dist.record(&[("org_id", "3")], 300);
517
518 assert_eq!(dist.cardinality(), 3); assert!(dist.overflow_count() > 0);
520 assert_eq!(dist.count(&[(OVERFLOW_LABEL_KEY, OVERFLOW_LABEL_VALUE)]), 1);
521 assert_eq!(dist.sum(&[(OVERFLOW_LABEL_KEY, OVERFLOW_LABEL_VALUE)]), 300);
522 }
523
524 #[test]
525 fn test_snapshot_includes_buckets() {
526 let dist = DynamicDistribution::new(4);
527 dist.record(&[("org_id", "1")], 100);
528 dist.record(&[("org_id", "1")], 200);
529
530 let snap = dist.snapshot();
531 assert_eq!(snap.len(), 1);
532 let (_, count, sum, bucket_snap) = &snap[0];
533 assert_eq!(*count, 2);
534 assert_eq!(*sum, 300);
535 assert!(bucket_snap.positive[6] > 0 || bucket_snap.positive[7] > 0);
537 }
538}