1use super::cache::{CacheValue, LabelCache, SERIES_CACHE_SIZE};
8#[cfg(feature = "eviction")]
9use super::current_cycle;
10use super::{DISTRIBUTION_IDS, DynamicLabelSet};
11use crate::exp_buckets::{ExpBuckets, ExpBucketsSnapshot};
12use crossbeam_utils::CachePadded;
13use parking_lot::{Mutex, RwLock};
14use std::cell::RefCell;
15use std::collections::HashMap;
16use std::hash::{Hash, Hasher};
17use std::sync::Arc;
18use std::sync::Weak;
19#[cfg(feature = "eviction")]
20use std::sync::atomic::AtomicU32;
21use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering};
22
23const DEFAULT_MAX_SERIES: usize = 2000;
24const OVERFLOW_LABEL_KEY: &str = "__ft_overflow";
25const OVERFLOW_LABEL_VALUE: &str = "true";
26type DistributionIndexShard =
27 CachePadded<RwLock<HashMap<DynamicLabelSet, Arc<DistributionSeries>>>>;
28type DistributionSnapshotEntry = (DynamicLabelSet, u64, u64, ExpBucketsSnapshot);
29static SERIES_IDS: AtomicUsize = AtomicUsize::new(1);
30
31struct DistributionSeries {
32 id: usize,
33 registry: Mutex<Vec<Arc<ExpBuckets>>>,
34 evicted: AtomicBool,
36 #[cfg(feature = "eviction")]
38 last_accessed_cycle: AtomicU32,
39}
40
41impl DistributionSeries {
42 #[cfg(feature = "eviction")]
43 fn new(cycle: u32) -> Self {
44 Self {
45 id: SERIES_IDS.fetch_add(1, Ordering::Relaxed),
46 registry: Mutex::new(Vec::new()),
47 evicted: AtomicBool::new(false),
48 last_accessed_cycle: AtomicU32::new(cycle),
49 }
50 }
51
52 #[cfg(not(feature = "eviction"))]
53 fn new() -> Self {
54 Self {
55 id: SERIES_IDS.fetch_add(1, Ordering::Relaxed),
56 registry: Mutex::new(Vec::new()),
57 evicted: AtomicBool::new(false),
58 }
59 }
60
61 #[cfg(feature = "eviction")]
63 #[inline]
64 fn touch(&self, cycle: u32) {
65 self.last_accessed_cycle.store(cycle, Ordering::Relaxed);
66 }
67
68 #[inline]
69 fn is_evicted(&self) -> bool {
70 self.evicted.load(Ordering::Relaxed)
71 }
72
73 #[cfg(feature = "eviction")]
74 fn mark_evicted(&self) {
75 self.evicted.store(true, Ordering::Relaxed);
76 }
77
78 fn get_or_create_buf(&self) -> Arc<ExpBuckets> {
79 let buf = Arc::new(ExpBuckets::new());
80 self.registry.lock().push(Arc::clone(&buf));
81 buf
82 }
83
84 fn count(&self) -> u64 {
85 self.registry.lock().iter().map(|buf| buf.get_count()).sum()
86 }
87
88 fn sum(&self) -> u64 {
89 self.registry.lock().iter().map(|buf| buf.get_sum()).sum()
90 }
91
92 fn buckets_snapshot(&self) -> ExpBucketsSnapshot {
93 let mut positive = [0u64; 64];
94 let mut zero_count = 0u64;
95 let mut sum = 0u64;
96 let mut count = 0u64;
97
98 let registry = self.registry.lock();
99 for buf in registry.iter() {
100 let thread_buckets = buf.get_positive_buckets();
101 for (i, &c) in thread_buckets.iter().enumerate() {
102 positive[i] += c;
103 }
104 zero_count += buf.get_zero_count();
105 sum += buf.get_sum();
106 count += buf.get_count();
107 }
108
109 ExpBucketsSnapshot {
110 positive,
111 zero_count,
112 sum,
113 count,
114 }
115 }
116}
117
118struct DistributionCacheValue {
119 series: Weak<DistributionSeries>,
120 buf: Arc<ExpBuckets>,
121}
122
123impl CacheValue for DistributionCacheValue {
124 type Strong = (Arc<DistributionSeries>, Arc<ExpBuckets>);
125
126 fn upgrade(&self) -> Option<Self::Strong> {
127 Some((self.series.upgrade()?, Arc::clone(&self.buf)))
128 }
129
130 fn is_valid(strong: &Self::Strong) -> bool {
131 !strong.0.is_evicted()
132 }
133}
134
135#[derive(Clone)]
141pub struct DynamicDistributionSeries {
142 series: Arc<DistributionSeries>,
143 buf: Arc<ExpBuckets>,
144}
145
146impl DynamicDistributionSeries {
147 #[inline]
149 pub fn record(&self, value: u64) {
150 self.buf.record(value);
151 }
152
153 pub fn count(&self) -> u64 {
155 self.series.count()
156 }
157
158 pub fn sum(&self) -> u64 {
160 self.series.sum()
161 }
162
163 #[inline]
165 pub fn is_evicted(&self) -> bool {
166 self.series.is_evicted()
167 }
168}
169
170thread_local! {
171 static SERIES_CACHE: RefCell<LabelCache<DistributionCacheValue, SERIES_CACHE_SIZE>> =
172 RefCell::new(LabelCache::new());
173 static SERIES_BUF_CACHE: RefCell<Vec<(usize, usize, Weak<ExpBuckets>)>> = const { RefCell::new(Vec::new()) };
174}
175
176pub struct DynamicDistribution {
180 id: usize,
181 max_series: usize,
182 shard_mask: usize,
183 index_shards: Vec<DistributionIndexShard>,
184 series_count: AtomicUsize,
186 overflow_count: AtomicU64,
188}
189
190impl DynamicDistribution {
191 pub fn new(shard_count: usize) -> Self {
193 Self::with_max_series(shard_count, DEFAULT_MAX_SERIES)
194 }
195
196 pub fn with_max_series(shard_count: usize, max_series: usize) -> Self {
198 let shard_count = shard_count.next_power_of_two();
199 let id = DISTRIBUTION_IDS.fetch_add(1, Ordering::Relaxed);
200 Self {
201 id,
202 max_series,
203 shard_mask: shard_count - 1,
204 index_shards: (0..shard_count)
205 .map(|_| CachePadded::new(RwLock::new(HashMap::new())))
206 .collect(),
207 series_count: AtomicUsize::new(0),
208 overflow_count: AtomicU64::new(0),
209 }
210 }
211
212 pub fn series(&self, labels: &[(&str, &str)]) -> DynamicDistributionSeries {
216 if let Some((series, buf)) = self.cached_series(labels) {
217 return DynamicDistributionSeries { series, buf };
218 }
219 let series = self.lookup_or_create(labels);
220 let buf = self.get_or_create_thread_buf(&series);
221 self.update_cache(labels, &series, Arc::clone(&buf));
222 DynamicDistributionSeries { series, buf }
223 }
224
225 #[inline]
227 pub fn record(&self, labels: &[(&str, &str)], value: u64) {
228 if let Some((_series, buf)) = self.cached_series(labels) {
229 buf.record(value);
230 return;
231 }
232
233 let series = self.lookup_or_create(labels);
234 let buf = self.get_or_create_thread_buf(&series);
235 self.update_cache(labels, &series, Arc::clone(&buf));
236 buf.record(value);
237 }
238
239 pub fn count(&self, labels: &[(&str, &str)]) -> u64 {
241 let key = DynamicLabelSet::from_pairs(labels);
242 let index_shard = self.index_shard_for(&key);
243 self.index_shards[index_shard]
244 .read()
245 .get(&key)
246 .map(|series| series.count())
247 .unwrap_or(0)
248 }
249
250 pub fn sum(&self, labels: &[(&str, &str)]) -> u64 {
252 let key = DynamicLabelSet::from_pairs(labels);
253 let index_shard = self.index_shard_for(&key);
254 self.index_shards[index_shard]
255 .read()
256 .get(&key)
257 .map(|series| series.sum())
258 .unwrap_or(0)
259 }
260
261 pub fn snapshot(&self) -> Vec<DistributionSnapshotEntry> {
263 let mut out = Vec::new();
264 for shard in &self.index_shards {
265 let guard = shard.read();
266 for (labels, series) in guard.iter() {
267 let snap = series.buckets_snapshot();
268 out.push((labels.clone(), snap.count, snap.sum, snap));
269 }
270 }
271 out
272 }
273
274 pub fn cardinality(&self) -> usize {
276 self.index_shards
277 .iter()
278 .map(|shard| shard.read().len())
279 .sum()
280 }
281
282 pub fn overflow_count(&self) -> u64 {
287 self.overflow_count.load(Ordering::Relaxed)
288 }
289
290 #[doc(hidden)]
295 pub fn visit_series(
296 &self,
297 mut f: impl FnMut(&[(String, String)], u64, u64, ExpBucketsSnapshot),
298 ) {
299 for shard in &self.index_shards {
300 let guard = shard.read();
301 for (labels, series) in guard.iter() {
302 let snap = series.buckets_snapshot();
303 f(labels.pairs(), snap.count, snap.sum, snap);
304 }
305 }
306 }
307
308 #[cfg(feature = "eviction")]
319 pub fn evict_stale(&self, max_staleness: u32) -> usize {
320 let cycle = current_cycle();
321 let mut removed = 0;
322
323 for shard in &self.index_shards {
324 let mut guard = shard.write();
325 guard.retain(|_labels, series| {
326 if Arc::strong_count(series) > 1 {
329 return true;
330 }
331 let last = series.last_accessed_cycle.load(Ordering::Relaxed);
333 let stale = cycle.saturating_sub(last) > max_staleness;
334 if stale {
335 series.mark_evicted();
336 removed += 1;
337 self.series_count.fetch_sub(1, Ordering::Relaxed);
338 }
339 !stale
340 });
341 }
342
343 removed
344 }
345
346 fn lookup_or_create(&self, labels: &[(&str, &str)]) -> Arc<DistributionSeries> {
347 let requested_key = DynamicLabelSet::from_pairs(labels);
348 let requested_shard = self.index_shard_for(&requested_key);
349 #[cfg(feature = "eviction")]
350 let cycle = current_cycle();
351
352 if let Some(series) = self.index_shards[requested_shard]
354 .read()
355 .get(&requested_key)
356 {
357 #[cfg(feature = "eviction")]
358 series.touch(cycle);
359 return Arc::clone(series);
360 }
361
362 let key = if self.max_series > 0
364 && self.series_count.load(Ordering::Relaxed) >= self.max_series
365 {
366 self.overflow_count.fetch_add(1, Ordering::Relaxed);
367 DynamicLabelSet::from_pairs(&[(OVERFLOW_LABEL_KEY, OVERFLOW_LABEL_VALUE)])
368 } else {
369 requested_key
370 };
371 let shard = self.index_shard_for(&key);
372
373 if let Some(series) = self.index_shards[shard].read().get(&key) {
374 #[cfg(feature = "eviction")]
375 series.touch(cycle);
376 return Arc::clone(series);
377 }
378
379 let mut guard = self.index_shards[shard].write();
380 if let Some(series) = guard.get(&key) {
381 #[cfg(feature = "eviction")]
382 series.touch(cycle);
383 return Arc::clone(series);
384 }
385 #[cfg(feature = "eviction")]
386 let series = Arc::new(DistributionSeries::new(cycle));
387 #[cfg(not(feature = "eviction"))]
388 let series = Arc::new(DistributionSeries::new());
389 guard.insert(key, Arc::clone(&series));
390 self.series_count.fetch_add(1, Ordering::Relaxed);
391 series
392 }
393
394 fn index_shard_for(&self, key: &DynamicLabelSet) -> usize {
395 let mut hasher = std::collections::hash_map::DefaultHasher::new();
396 key.hash(&mut hasher);
397 (hasher.finish() as usize) & self.shard_mask
398 }
399
400 fn cached_series(
401 &self,
402 labels: &[(&str, &str)],
403 ) -> Option<(Arc<DistributionSeries>, Arc<ExpBuckets>)> {
404 SERIES_CACHE.with(|cache| {
405 let (series, buf) = cache.borrow_mut().get(self.id, labels)?;
406 #[cfg(feature = "eviction")]
407 series.touch(current_cycle());
408 Some((series, buf))
409 })
410 }
411
412 fn update_cache(
413 &self,
414 labels: &[(&str, &str)],
415 series: &Arc<DistributionSeries>,
416 buf: Arc<ExpBuckets>,
417 ) {
418 SERIES_CACHE.with(|cache| {
419 cache.borrow_mut().insert(
420 self.id,
421 labels,
422 DistributionCacheValue {
423 series: Arc::downgrade(series),
424 buf,
425 },
426 );
427 });
428 }
429
430 fn get_or_create_thread_buf(&self, series: &Arc<DistributionSeries>) -> Arc<ExpBuckets> {
431 let dist_id = self.id;
432 let series_id = series.id;
433
434 SERIES_BUF_CACHE.with(|cache| {
435 let mut entries = cache.borrow_mut();
436 entries.retain(|(_id, _ptr, weak)| weak.strong_count() > 0);
437
438 for (id, ptr, weak) in entries.iter() {
439 if *id == dist_id
440 && *ptr == series_id
441 && let Some(buf) = weak.upgrade()
442 {
443 return buf;
444 }
445 }
446
447 let buf = series.get_or_create_buf();
448 entries.push((dist_id, series_id, Arc::downgrade(&buf)));
449 buf
450 })
451 }
452}
453
454#[cfg(test)]
455mod tests {
456 use super::*;
457
458 #[test]
459 fn test_basic_recording() {
460 let dist = DynamicDistribution::new(4);
461 let labels = &[("org_id", "42")];
462
463 dist.record(labels, 100);
464 dist.record(labels, 200);
465 dist.record(labels, 300);
466
467 assert_eq!(dist.count(labels), 3);
468 assert_eq!(dist.sum(labels), 600);
469 }
470
471 #[test]
472 fn test_label_order_is_canonicalized() {
473 let dist = DynamicDistribution::new(4);
474
475 dist.record(&[("org_id", "42"), ("endpoint", "abc")], 100);
476
477 assert_eq!(dist.count(&[("endpoint", "abc"), ("org_id", "42")]), 1);
478 }
479
480 #[test]
481 fn test_series_handle() {
482 let dist = DynamicDistribution::new(4);
483 let series = dist.series(&[("org_id", "42")]);
484
485 series.record(100);
486 series.record(200);
487
488 assert_eq!(series.count(), 2);
489 assert_eq!(series.sum(), 300);
490 assert_eq!(dist.count(&[("org_id", "42")]), 2);
491 }
492
493 #[test]
494 fn test_multiple_label_sets() {
495 let dist = DynamicDistribution::new(4);
496
497 dist.record(&[("org_id", "1")], 100);
498 dist.record(&[("org_id", "2")], 200);
499
500 assert_eq!(dist.count(&[("org_id", "1")]), 1);
501 assert_eq!(dist.count(&[("org_id", "2")]), 1);
502
503 let snap = dist.snapshot();
504 assert_eq!(snap.len(), 2);
505 }
506
507 #[test]
508 fn test_overflow_bucket_routes_new_series_at_capacity() {
509 let dist = DynamicDistribution::with_max_series(4, 2);
510
511 dist.record(&[("org_id", "1")], 100);
512 dist.record(&[("org_id", "2")], 200);
513 dist.record(&[("org_id", "3")], 300);
515
516 assert_eq!(dist.cardinality(), 3); assert!(dist.overflow_count() > 0);
518 assert_eq!(dist.count(&[(OVERFLOW_LABEL_KEY, OVERFLOW_LABEL_VALUE)]), 1);
519 assert_eq!(dist.sum(&[(OVERFLOW_LABEL_KEY, OVERFLOW_LABEL_VALUE)]), 300);
520 }
521
522 #[test]
523 fn test_snapshot_includes_buckets() {
524 let dist = DynamicDistribution::new(4);
525 dist.record(&[("org_id", "1")], 100);
526 dist.record(&[("org_id", "1")], 200);
527
528 let snap = dist.snapshot();
529 assert_eq!(snap.len(), 1);
530 let (_, count, sum, bucket_snap) = &snap[0];
531 assert_eq!(*count, 2);
532 assert_eq!(*sum, 300);
533 assert!(bucket_snap.positive[6] > 0 || bucket_snap.positive[7] > 0);
535 }
536}