1#[cfg(feature = "eviction")]
8use super::current_cycle;
9use super::{DISTRIBUTION_IDS, DynamicLabelSet};
10use crate::exp_buckets::{ExpBuckets, ExpBucketsSnapshot};
11use crossbeam_utils::CachePadded;
12use parking_lot::{Mutex, RwLock};
13use std::cell::RefCell;
14use std::collections::HashMap;
15use std::hash::{Hash, Hasher};
16use std::sync::Arc;
17use std::sync::Weak;
18#[cfg(feature = "eviction")]
19use std::sync::atomic::AtomicU32;
20use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering};
21
22const DEFAULT_MAX_SERIES: usize = 2000;
23const OVERFLOW_LABEL_KEY: &str = "__ft_overflow";
24const OVERFLOW_LABEL_VALUE: &str = "true";
25type DistributionIndexShard =
26 CachePadded<RwLock<HashMap<DynamicLabelSet, Arc<DistributionSeries>>>>;
27type DistributionSnapshotEntry = (DynamicLabelSet, u64, u64, ExpBucketsSnapshot);
28static SERIES_IDS: AtomicUsize = AtomicUsize::new(1);
29
30struct DistributionSeries {
31 id: usize,
32 registry: Mutex<Vec<Arc<ExpBuckets>>>,
33 evicted: AtomicBool,
35 #[cfg(feature = "eviction")]
37 last_accessed_cycle: AtomicU32,
38}
39
40impl DistributionSeries {
41 #[cfg(feature = "eviction")]
42 fn new(cycle: u32) -> Self {
43 Self {
44 id: SERIES_IDS.fetch_add(1, Ordering::Relaxed),
45 registry: Mutex::new(Vec::new()),
46 evicted: AtomicBool::new(false),
47 last_accessed_cycle: AtomicU32::new(cycle),
48 }
49 }
50
51 #[cfg(not(feature = "eviction"))]
52 fn new() -> Self {
53 Self {
54 id: SERIES_IDS.fetch_add(1, Ordering::Relaxed),
55 registry: Mutex::new(Vec::new()),
56 evicted: AtomicBool::new(false),
57 }
58 }
59
60 #[cfg(feature = "eviction")]
62 #[inline]
63 fn touch(&self, cycle: u32) {
64 self.last_accessed_cycle.store(cycle, Ordering::Relaxed);
65 }
66
67 #[inline]
68 fn is_evicted(&self) -> bool {
69 self.evicted.load(Ordering::Relaxed)
70 }
71
72 #[cfg(feature = "eviction")]
73 fn mark_evicted(&self) {
74 self.evicted.store(true, Ordering::Relaxed);
75 }
76
77 fn get_or_create_buf(&self) -> Arc<ExpBuckets> {
78 let buf = Arc::new(ExpBuckets::new());
79 self.registry.lock().push(Arc::clone(&buf));
80 buf
81 }
82
83 fn count(&self) -> u64 {
84 self.registry.lock().iter().map(|buf| buf.get_count()).sum()
85 }
86
87 fn sum(&self) -> u64 {
88 self.registry.lock().iter().map(|buf| buf.get_sum()).sum()
89 }
90
91 fn buckets_snapshot(&self) -> ExpBucketsSnapshot {
92 let mut positive = [0u64; 64];
93 let mut zero_count = 0u64;
94 let mut sum = 0u64;
95 let mut count = 0u64;
96
97 let registry = self.registry.lock();
98 for buf in registry.iter() {
99 let thread_buckets = buf.get_positive_buckets();
100 for (i, &c) in thread_buckets.iter().enumerate() {
101 positive[i] += c;
102 }
103 zero_count += buf.get_zero_count();
104 sum += buf.get_sum();
105 count += buf.get_count();
106 }
107
108 ExpBucketsSnapshot {
109 positive,
110 zero_count,
111 sum,
112 count,
113 }
114 }
115}
116
117#[derive(Clone)]
123pub struct DynamicDistributionSeries {
124 series: Arc<DistributionSeries>,
125 buf: Arc<ExpBuckets>,
126}
127
128impl DynamicDistributionSeries {
129 #[inline]
131 pub fn record(&self, value: u64) {
132 self.buf.record(value);
133 }
134
135 pub fn count(&self) -> u64 {
137 self.series.count()
138 }
139
140 pub fn sum(&self) -> u64 {
142 self.series.sum()
143 }
144
145 #[inline]
147 pub fn is_evicted(&self) -> bool {
148 self.series.is_evicted()
149 }
150}
151
152struct SeriesCacheEntry {
153 distribution_id: usize,
154 ordered_labels: Vec<(String, String)>,
155 series: Weak<DistributionSeries>,
156 buf: Arc<ExpBuckets>,
157}
158
159thread_local! {
160 static SERIES_CACHE: RefCell<Option<SeriesCacheEntry>> = const { RefCell::new(None) };
161 static SERIES_BUF_CACHE: RefCell<Vec<(usize, usize, Weak<ExpBuckets>)>> = const { RefCell::new(Vec::new()) };
162}
163
164pub struct DynamicDistribution {
168 id: usize,
169 max_series: usize,
170 shard_mask: usize,
171 index_shards: Vec<DistributionIndexShard>,
172 series_count: AtomicUsize,
174 overflow_count: AtomicU64,
176}
177
178impl DynamicDistribution {
179 pub fn new(shard_count: usize) -> Self {
181 Self::with_max_series(shard_count, DEFAULT_MAX_SERIES)
182 }
183
184 pub fn with_max_series(shard_count: usize, max_series: usize) -> Self {
186 let shard_count = shard_count.next_power_of_two();
187 let id = DISTRIBUTION_IDS.fetch_add(1, Ordering::Relaxed);
188 Self {
189 id,
190 max_series,
191 shard_mask: shard_count - 1,
192 index_shards: (0..shard_count)
193 .map(|_| CachePadded::new(RwLock::new(HashMap::new())))
194 .collect(),
195 series_count: AtomicUsize::new(0),
196 overflow_count: AtomicU64::new(0),
197 }
198 }
199
200 pub fn series(&self, labels: &[(&str, &str)]) -> DynamicDistributionSeries {
204 if let Some((series, buf)) = self.cached_series(labels) {
205 return DynamicDistributionSeries { series, buf };
206 }
207 let series = self.lookup_or_create(labels);
208 let buf = self.get_or_create_thread_buf(&series);
209 self.update_cache(labels, Arc::clone(&series), Arc::clone(&buf));
210 DynamicDistributionSeries { series, buf }
211 }
212
213 #[inline]
215 pub fn record(&self, labels: &[(&str, &str)], value: u64) {
216 if let Some((_series, buf)) = self.cached_series(labels) {
217 buf.record(value);
218 return;
219 }
220
221 let series = self.lookup_or_create(labels);
222 let buf = self.get_or_create_thread_buf(&series);
223 self.update_cache(labels, Arc::clone(&series), Arc::clone(&buf));
224 buf.record(value);
225 }
226
227 pub fn count(&self, labels: &[(&str, &str)]) -> u64 {
229 let key = DynamicLabelSet::from_pairs(labels);
230 let index_shard = self.index_shard_for(&key);
231 self.index_shards[index_shard]
232 .read()
233 .get(&key)
234 .map(|series| series.count())
235 .unwrap_or(0)
236 }
237
238 pub fn sum(&self, labels: &[(&str, &str)]) -> u64 {
240 let key = DynamicLabelSet::from_pairs(labels);
241 let index_shard = self.index_shard_for(&key);
242 self.index_shards[index_shard]
243 .read()
244 .get(&key)
245 .map(|series| series.sum())
246 .unwrap_or(0)
247 }
248
249 pub fn snapshot(&self) -> Vec<DistributionSnapshotEntry> {
251 let mut out = Vec::new();
252 for shard in &self.index_shards {
253 let guard = shard.read();
254 for (labels, series) in guard.iter() {
255 let snap = series.buckets_snapshot();
256 out.push((labels.clone(), snap.count, snap.sum, snap));
257 }
258 }
259 out
260 }
261
262 pub fn cardinality(&self) -> usize {
264 self.index_shards
265 .iter()
266 .map(|shard| shard.read().len())
267 .sum()
268 }
269
270 pub fn overflow_count(&self) -> u64 {
275 self.overflow_count.load(Ordering::Relaxed)
276 }
277
278 #[doc(hidden)]
283 pub fn visit_series(
284 &self,
285 mut f: impl FnMut(&[(String, String)], u64, u64, ExpBucketsSnapshot),
286 ) {
287 for shard in &self.index_shards {
288 let guard = shard.read();
289 for (labels, series) in guard.iter() {
290 let snap = series.buckets_snapshot();
291 f(labels.pairs(), snap.count, snap.sum, snap);
292 }
293 }
294 }
295
296 #[cfg(feature = "eviction")]
307 pub fn evict_stale(&self, max_staleness: u32) -> usize {
308 let cycle = current_cycle();
309 let mut removed = 0;
310
311 for shard in &self.index_shards {
312 let mut guard = shard.write();
313 guard.retain(|_labels, series| {
314 if Arc::strong_count(series) > 1 {
317 return true;
318 }
319 let last = series.last_accessed_cycle.load(Ordering::Relaxed);
321 let stale = cycle.saturating_sub(last) > max_staleness;
322 if stale {
323 series.mark_evicted();
324 removed += 1;
325 self.series_count.fetch_sub(1, Ordering::Relaxed);
326 }
327 !stale
328 });
329 }
330
331 removed
332 }
333
334 fn lookup_or_create(&self, labels: &[(&str, &str)]) -> Arc<DistributionSeries> {
335 let requested_key = DynamicLabelSet::from_pairs(labels);
336 let requested_shard = self.index_shard_for(&requested_key);
337 #[cfg(feature = "eviction")]
338 let cycle = current_cycle();
339
340 if let Some(series) = self.index_shards[requested_shard]
342 .read()
343 .get(&requested_key)
344 {
345 #[cfg(feature = "eviction")]
346 series.touch(cycle);
347 return Arc::clone(series);
348 }
349
350 let key = if self.max_series > 0
352 && self.series_count.load(Ordering::Relaxed) >= self.max_series
353 {
354 self.overflow_count.fetch_add(1, Ordering::Relaxed);
355 DynamicLabelSet::from_pairs(&[(OVERFLOW_LABEL_KEY, OVERFLOW_LABEL_VALUE)])
356 } else {
357 requested_key
358 };
359 let shard = self.index_shard_for(&key);
360
361 if let Some(series) = self.index_shards[shard].read().get(&key) {
362 #[cfg(feature = "eviction")]
363 series.touch(cycle);
364 return Arc::clone(series);
365 }
366
367 let mut guard = self.index_shards[shard].write();
368 if let Some(series) = guard.get(&key) {
369 #[cfg(feature = "eviction")]
370 series.touch(cycle);
371 return Arc::clone(series);
372 }
373 #[cfg(feature = "eviction")]
374 let series = Arc::new(DistributionSeries::new(cycle));
375 #[cfg(not(feature = "eviction"))]
376 let series = Arc::new(DistributionSeries::new());
377 guard.insert(key, Arc::clone(&series));
378 self.series_count.fetch_add(1, Ordering::Relaxed);
379 series
380 }
381
382 fn index_shard_for(&self, key: &DynamicLabelSet) -> usize {
383 let mut hasher = std::collections::hash_map::DefaultHasher::new();
384 key.hash(&mut hasher);
385 (hasher.finish() as usize) & self.shard_mask
386 }
387
388 fn cached_series(
389 &self,
390 labels: &[(&str, &str)],
391 ) -> Option<(Arc<DistributionSeries>, Arc<ExpBuckets>)> {
392 SERIES_CACHE.with(|cache| {
393 let cache_ref = cache.borrow();
394 let entry = cache_ref.as_ref()?;
395 if entry.distribution_id != self.id {
396 return None;
397 }
398 if entry.ordered_labels.len() != labels.len() {
399 return None;
400 }
401 for (idx, (k, v)) in labels.iter().enumerate() {
402 let (ek, ev) = &entry.ordered_labels[idx];
403 if ek != k || ev != v {
404 return None;
405 }
406 }
407 let series = entry.series.upgrade()?;
408 if series.is_evicted() {
410 return None;
411 }
412 #[cfg(feature = "eviction")]
413 series.touch(current_cycle());
414 Some((series, Arc::clone(&entry.buf)))
415 })
416 }
417
418 fn update_cache(
419 &self,
420 labels: &[(&str, &str)],
421 series: Arc<DistributionSeries>,
422 buf: Arc<ExpBuckets>,
423 ) {
424 SERIES_CACHE.with(|cache| {
425 let ordered_labels = labels
426 .iter()
427 .map(|(k, v)| ((*k).to_string(), (*v).to_string()))
428 .collect();
429 *cache.borrow_mut() = Some(SeriesCacheEntry {
430 distribution_id: self.id,
431 ordered_labels,
432 series: Arc::downgrade(&series),
433 buf,
434 });
435 });
436 }
437
438 fn get_or_create_thread_buf(&self, series: &Arc<DistributionSeries>) -> Arc<ExpBuckets> {
439 let dist_id = self.id;
440 let series_id = series.id;
441
442 SERIES_BUF_CACHE.with(|cache| {
443 let mut entries = cache.borrow_mut();
444 entries.retain(|(_id, _ptr, weak)| weak.strong_count() > 0);
445
446 for (id, ptr, weak) in entries.iter() {
447 if *id == dist_id
448 && *ptr == series_id
449 && let Some(buf) = weak.upgrade()
450 {
451 return buf;
452 }
453 }
454
455 let buf = series.get_or_create_buf();
456 entries.push((dist_id, series_id, Arc::downgrade(&buf)));
457 buf
458 })
459 }
460}
461
462#[cfg(test)]
463mod tests {
464 use super::*;
465
466 #[test]
467 fn test_basic_recording() {
468 let dist = DynamicDistribution::new(4);
469 let labels = &[("org_id", "42")];
470
471 dist.record(labels, 100);
472 dist.record(labels, 200);
473 dist.record(labels, 300);
474
475 assert_eq!(dist.count(labels), 3);
476 assert_eq!(dist.sum(labels), 600);
477 }
478
479 #[test]
480 fn test_label_order_is_canonicalized() {
481 let dist = DynamicDistribution::new(4);
482
483 dist.record(&[("org_id", "42"), ("endpoint", "abc")], 100);
484
485 assert_eq!(dist.count(&[("endpoint", "abc"), ("org_id", "42")]), 1);
486 }
487
488 #[test]
489 fn test_series_handle() {
490 let dist = DynamicDistribution::new(4);
491 let series = dist.series(&[("org_id", "42")]);
492
493 series.record(100);
494 series.record(200);
495
496 assert_eq!(series.count(), 2);
497 assert_eq!(series.sum(), 300);
498 assert_eq!(dist.count(&[("org_id", "42")]), 2);
499 }
500
501 #[test]
502 fn test_multiple_label_sets() {
503 let dist = DynamicDistribution::new(4);
504
505 dist.record(&[("org_id", "1")], 100);
506 dist.record(&[("org_id", "2")], 200);
507
508 assert_eq!(dist.count(&[("org_id", "1")]), 1);
509 assert_eq!(dist.count(&[("org_id", "2")]), 1);
510
511 let snap = dist.snapshot();
512 assert_eq!(snap.len(), 2);
513 }
514
515 #[test]
516 fn test_overflow_bucket_routes_new_series_at_capacity() {
517 let dist = DynamicDistribution::with_max_series(4, 2);
518
519 dist.record(&[("org_id", "1")], 100);
520 dist.record(&[("org_id", "2")], 200);
521 dist.record(&[("org_id", "3")], 300);
523
524 assert_eq!(dist.cardinality(), 3); assert!(dist.overflow_count() > 0);
526 assert_eq!(dist.count(&[(OVERFLOW_LABEL_KEY, OVERFLOW_LABEL_VALUE)]), 1);
527 assert_eq!(dist.sum(&[(OVERFLOW_LABEL_KEY, OVERFLOW_LABEL_VALUE)]), 300);
528 }
529
530 #[test]
531 fn test_snapshot_includes_buckets() {
532 let dist = DynamicDistribution::new(4);
533 dist.record(&[("org_id", "1")], 100);
534 dist.record(&[("org_id", "1")], 200);
535
536 let snap = dist.snapshot();
537 assert_eq!(snap.len(), 1);
538 let (_, count, sum, bucket_snap) = &snap[0];
539 assert_eq!(*count, 2);
540 assert_eq!(*sum, 300);
541 assert!(bucket_snap.positive[6] > 0 || bucket_snap.positive[7] > 0);
543 }
544}