1#[cfg(feature = "eviction")]
4use super::current_cycle;
5use super::{COUNTER_IDS, DynamicLabelSet, thread_id};
6use crossbeam_utils::CachePadded;
7use parking_lot::RwLock;
8use std::cell::RefCell;
9use std::collections::HashMap;
10use std::hash::{Hash, Hasher};
11#[cfg(feature = "eviction")]
12use std::sync::atomic::AtomicU32;
13use std::sync::atomic::{AtomicBool, AtomicIsize, AtomicU64, AtomicUsize, Ordering};
14use std::sync::{Arc, Weak};
15
16const DEFAULT_MAX_SERIES: usize = 2000;
17const OVERFLOW_LABEL_KEY: &str = "__ft_overflow";
18const OVERFLOW_LABEL_VALUE: &str = "true";
19
20struct CounterSeries {
21 cells: Vec<CachePadded<AtomicIsize>>,
22 evicted: AtomicBool,
25 #[cfg(feature = "eviction")]
28 last_accessed_cycle: AtomicU32,
29}
30
31type CounterIndexShard = CachePadded<RwLock<HashMap<DynamicLabelSet, Arc<CounterSeries>>>>;
32
33impl CounterSeries {
34 #[cfg(feature = "eviction")]
35 fn new(shard_count: usize, current_cycle: u32) -> Self {
36 Self {
37 cells: (0..shard_count)
38 .map(|_| CachePadded::new(AtomicIsize::new(0)))
39 .collect(),
40 evicted: AtomicBool::new(false),
41 last_accessed_cycle: AtomicU32::new(current_cycle),
42 }
43 }
44
45 #[cfg(not(feature = "eviction"))]
46 fn new(shard_count: usize) -> Self {
47 Self {
48 cells: (0..shard_count)
49 .map(|_| CachePadded::new(AtomicIsize::new(0)))
50 .collect(),
51 evicted: AtomicBool::new(false),
52 }
53 }
54
55 #[inline]
56 fn add_at(&self, shard_idx: usize, value: isize) {
57 self.cells[shard_idx].fetch_add(value, Ordering::Relaxed);
58 }
61
62 #[cfg(feature = "eviction")]
64 #[inline]
65 fn touch(&self, cycle: u32) {
66 self.last_accessed_cycle.store(cycle, Ordering::Relaxed);
67 }
68
69 #[inline]
70 fn sum(&self) -> isize {
71 self.cells
72 .iter()
73 .map(|cell| cell.load(Ordering::Relaxed))
74 .sum()
75 }
76
77 #[inline]
78 fn is_evicted(&self) -> bool {
79 self.evicted.load(Ordering::Relaxed)
80 }
81
82 #[cfg(feature = "eviction")]
83 fn mark_evicted(&self) {
84 self.evicted.store(true, Ordering::Relaxed);
85 }
86}
87
88#[derive(Clone)]
94pub struct DynamicCounterSeries {
95 series: Arc<CounterSeries>,
96 shard_mask: usize,
97}
98
99impl DynamicCounterSeries {
100 #[inline]
102 pub fn inc(&self) {
103 self.add(1);
104 }
105
106 #[inline]
108 pub fn add(&self, value: isize) {
109 let shard_idx = thread_id() & self.shard_mask;
110 self.series.add_at(shard_idx, value);
111 }
112
113 #[inline]
115 pub fn get(&self) -> isize {
116 self.series.sum()
117 }
118
119 #[inline]
125 pub fn is_evicted(&self) -> bool {
126 self.series.is_evicted()
127 }
128}
129
130struct SeriesCacheEntry {
131 counter_id: usize,
132 ordered_labels: Vec<(String, String)>,
133 series: Weak<CounterSeries>,
134}
135
136thread_local! {
137 static SERIES_CACHE: RefCell<Option<SeriesCacheEntry>> = const { RefCell::new(None) };
138}
139
140pub struct DynamicCounter {
145 id: usize,
146 shard_count: usize,
147 max_series: usize,
148 shard_mask: usize,
149 index_shards: Vec<CounterIndexShard>,
150 series_count: AtomicUsize,
152 overflow_count: AtomicU64,
154}
155
156impl DynamicCounter {
157 pub fn new(shard_count: usize) -> Self {
159 Self::with_max_series(shard_count, DEFAULT_MAX_SERIES)
160 }
161
162 pub fn with_max_series(shard_count: usize, max_series: usize) -> Self {
170 let shard_count = shard_count.next_power_of_two();
171 let id = COUNTER_IDS.fetch_add(1, Ordering::Relaxed);
172 Self {
173 id,
174 shard_count,
175 max_series,
176 shard_mask: shard_count - 1,
177 index_shards: (0..shard_count)
178 .map(|_| CachePadded::new(RwLock::new(HashMap::new())))
179 .collect(),
180 series_count: AtomicUsize::new(0),
181 overflow_count: AtomicU64::new(0),
182 }
183 }
184
185 pub fn series(&self, labels: &[(&str, &str)]) -> DynamicCounterSeries {
189 if let Some(series) = self.cached_series(labels) {
190 return DynamicCounterSeries {
191 series,
192 shard_mask: self.shard_mask,
193 };
194 }
195 let series = self.lookup_or_create(labels);
196 self.update_cache(labels, Arc::clone(&series));
197 DynamicCounterSeries {
198 series,
199 shard_mask: self.shard_mask,
200 }
201 }
202
203 #[inline]
205 pub fn inc(&self, labels: &[(&str, &str)]) {
206 self.add(labels, 1);
207 }
208
209 #[inline]
211 pub fn add(&self, labels: &[(&str, &str)], value: isize) {
212 if let Some(series) = self.cached_series(labels) {
213 let shard_idx = thread_id() & self.shard_mask;
214 series.add_at(shard_idx, value);
215 return;
216 }
217
218 let series = self.lookup_or_create(labels);
219 self.update_cache(labels, Arc::clone(&series));
220 let shard_idx = thread_id() & self.shard_mask;
221 series.add_at(shard_idx, value);
222 }
223
224 pub fn get(&self, labels: &[(&str, &str)]) -> isize {
226 let key = DynamicLabelSet::from_pairs(labels);
227 let index_shard = self.index_shard_for(&key);
228 self.index_shards[index_shard]
229 .read()
230 .get(&key)
231 .map(|series| series.sum())
232 .unwrap_or(0)
233 }
234
235 pub fn sum_all(&self) -> isize {
237 self.snapshot().into_iter().map(|(_, value)| value).sum()
238 }
239
240 pub fn snapshot(&self) -> Vec<(DynamicLabelSet, isize)> {
242 let mut out = Vec::new();
243 for shard in &self.index_shards {
244 let guard = shard.read();
245 for (labels, series) in guard.iter() {
246 out.push((labels.clone(), series.sum()));
247 }
248 }
249 out
250 }
251
252 pub fn cardinality(&self) -> usize {
254 self.index_shards
255 .iter()
256 .map(|shard| shard.read().len())
257 .sum()
258 }
259
260 pub fn overflow_count(&self) -> u64 {
265 self.overflow_count.load(Ordering::Relaxed)
266 }
267
268 #[doc(hidden)]
273 pub fn visit_series(&self, mut f: impl FnMut(&[(String, String)], isize)) {
274 for shard in &self.index_shards {
275 let guard = shard.read();
276 for (labels, series) in guard.iter() {
277 f(labels.pairs(), series.sum());
278 }
279 }
280 }
281
282 #[cfg(feature = "eviction")]
293 pub fn evict_stale(&self, max_staleness: u32) -> usize {
294 let cycle = current_cycle();
295 let mut removed = 0;
296
297 for shard in &self.index_shards {
298 let mut guard = shard.write();
299 guard.retain(|_labels, series| {
300 if Arc::strong_count(series) > 1 {
303 return true;
304 }
305 let last = series.last_accessed_cycle.load(Ordering::Relaxed);
307 let stale = cycle.saturating_sub(last) > max_staleness;
308 if stale {
309 series.mark_evicted();
310 removed += 1;
311 self.series_count.fetch_sub(1, Ordering::Relaxed);
312 }
313 !stale
314 });
315 }
316
317 removed
318 }
319
320 fn lookup_or_create(&self, labels: &[(&str, &str)]) -> Arc<CounterSeries> {
321 let requested_key = DynamicLabelSet::from_pairs(labels);
322 let requested_shard = self.index_shard_for(&requested_key);
323 #[cfg(feature = "eviction")]
324 let cycle = current_cycle();
325
326 if let Some(series) = self.index_shards[requested_shard]
328 .read()
329 .get(&requested_key)
330 {
331 #[cfg(feature = "eviction")]
332 series.touch(cycle);
333 return Arc::clone(series);
334 }
335
336 let key = if self.max_series > 0
341 && self.series_count.load(Ordering::Relaxed) >= self.max_series
342 {
343 self.overflow_count.fetch_add(1, Ordering::Relaxed);
344 DynamicLabelSet::from_pairs(&[(OVERFLOW_LABEL_KEY, OVERFLOW_LABEL_VALUE)])
345 } else {
346 requested_key
347 };
348 let shard = self.index_shard_for(&key);
349
350 if let Some(series) = self.index_shards[shard].read().get(&key) {
352 #[cfg(feature = "eviction")]
353 series.touch(cycle);
354 return Arc::clone(series);
355 }
356
357 let mut guard = self.index_shards[shard].write();
358 if let Some(series) = guard.get(&key) {
359 #[cfg(feature = "eviction")]
360 series.touch(cycle);
361 return Arc::clone(series);
362 }
363 #[cfg(feature = "eviction")]
364 let series = Arc::new(CounterSeries::new(self.shard_count, cycle));
365 #[cfg(not(feature = "eviction"))]
366 let series = Arc::new(CounterSeries::new(self.shard_count));
367 guard.insert(key, Arc::clone(&series));
368 self.series_count.fetch_add(1, Ordering::Relaxed);
369 series
370 }
371
372 fn index_shard_for(&self, key: &DynamicLabelSet) -> usize {
373 let mut hasher = std::collections::hash_map::DefaultHasher::new();
374 key.hash(&mut hasher);
375 (hasher.finish() as usize) & self.shard_mask
376 }
377
378 fn cached_series(&self, labels: &[(&str, &str)]) -> Option<Arc<CounterSeries>> {
379 SERIES_CACHE.with(|cache| {
380 let cache_ref = cache.borrow();
381 let entry = cache_ref.as_ref()?;
382 if entry.counter_id != self.id {
383 return None;
384 }
385 if entry.ordered_labels.len() != labels.len() {
386 return None;
387 }
388 for (idx, (k, v)) in labels.iter().enumerate() {
389 let (ek, ev) = &entry.ordered_labels[idx];
390 if ek != k || ev != v {
391 return None;
392 }
393 }
394 let series = entry.series.upgrade()?;
395 if series.is_evicted() {
396 return None;
397 }
398 #[cfg(feature = "eviction")]
399 series.touch(current_cycle());
400 Some(series)
401 })
402 }
403
404 fn update_cache(&self, labels: &[(&str, &str)], series: Arc<CounterSeries>) {
405 SERIES_CACHE.with(|cache| {
406 let ordered_labels = labels
407 .iter()
408 .map(|(k, v)| ((*k).to_string(), (*v).to_string()))
409 .collect();
410 *cache.borrow_mut() = Some(SeriesCacheEntry {
411 counter_id: self.id,
412 ordered_labels,
413 series: Arc::downgrade(&series),
414 });
415 });
416 }
417}
418
419#[cfg(test)]
420mod tests {
421 #[cfg(feature = "eviction")]
422 use super::super::advance_cycle;
423 use super::*;
424
425 #[test]
426 fn test_basic_operations() {
427 let counter = DynamicCounter::new(4);
428 counter.inc(&[("org_id", "42"), ("endpoint_uuid", "abc")]);
429 counter.add(&[("org_id", "42"), ("endpoint_uuid", "abc")], 2);
430
431 assert_eq!(
432 counter.get(&[("org_id", "42"), ("endpoint_uuid", "abc")]),
433 3
434 );
435 assert_eq!(counter.sum_all(), 3);
436 }
437
438 #[test]
439 fn test_label_order_is_canonicalized() {
440 let counter = DynamicCounter::new(4);
441 counter.inc(&[("org_id", "42"), ("endpoint_uuid", "abc")]);
442
443 assert_eq!(
444 counter.get(&[("endpoint_uuid", "abc"), ("org_id", "42")]),
445 1
446 );
447 }
448
449 #[test]
450 fn test_series_handle() {
451 let counter = DynamicCounter::new(4);
452 let series = counter.series(&[("org_id", "42"), ("endpoint_uuid", "abc")]);
453 series.inc();
454 series.add(9);
455
456 assert_eq!(series.get(), 10);
457 assert_eq!(
458 counter.get(&[("org_id", "42"), ("endpoint_uuid", "abc")]),
459 10
460 );
461 }
462
463 #[test]
464 fn test_concurrent_adds() {
465 let counter = DynamicCounter::new(8);
466 let series = counter.series(&[("org_id", "42"), ("endpoint_uuid", "abc")]);
467
468 std::thread::scope(|s| {
469 for _ in 0..8 {
470 let series = series.clone();
471 s.spawn(move || {
472 for _ in 0..10_000 {
473 series.inc();
474 }
475 });
476 }
477 });
478
479 assert_eq!(
480 counter.get(&[("org_id", "42"), ("endpoint_uuid", "abc")]),
481 80_000
482 );
483 }
484
485 #[cfg(feature = "eviction")]
486 #[test]
487 fn test_evict_stale() {
488 let counter = DynamicCounter::new(4);
489 let labels = &[("org_id", "42")];
490
491 counter.inc(labels);
493 assert_eq!(counter.cardinality(), 1);
494 assert_eq!(counter.get(labels), 1);
495
496 advance_cycle();
498 advance_cycle();
499
500 counter.inc(&[("flush", "cache")]);
502
503 let removed = counter.evict_stale(1);
505 assert_eq!(removed, 1); assert_eq!(counter.cardinality(), 1); assert_eq!(counter.get(labels), 0);
510
511 counter.inc(labels);
513 assert_eq!(counter.cardinality(), 2);
514 assert_eq!(counter.get(labels), 1);
515 }
516
517 #[cfg(feature = "eviction")]
518 #[test]
519 fn test_evict_stale_keeps_active() {
520 let counter = DynamicCounter::new(4);
521 let active = &[("status", "active")];
522 let stale = &[("status", "stale")];
523
524 counter.inc(active);
526 counter.inc(stale);
527 assert_eq!(counter.cardinality(), 2);
528
529 advance_cycle();
531
532 counter.inc(active);
534
535 advance_cycle();
537
538 let removed = counter.evict_stale(1);
540 assert_eq!(removed, 1);
541 assert_eq!(counter.cardinality(), 1);
542 assert_eq!(counter.get(active), 2);
543 assert_eq!(counter.get(stale), 0);
544 }
545
546 #[cfg(feature = "eviction")]
547 #[test]
548 fn test_eviction_tombstone_invalidates_cache() {
549 let counter = DynamicCounter::new(4);
550 let labels = &[("org_id", "evict_test")];
551
552 counter.inc(labels);
554 counter.inc(labels); assert_eq!(counter.get(labels), 2);
556
557 advance_cycle();
559 advance_cycle();
560
561 counter.inc(&[("flush", "cache")]);
563
564 counter.evict_stale(1);
565
566 counter.inc(labels);
568 assert_eq!(counter.get(labels), 1); }
570
571 #[cfg(feature = "eviction")]
572 #[test]
573 fn test_series_handle_protects_from_eviction() {
574 let counter = DynamicCounter::new(4);
575 let labels = &[("org_id", "handle_test")];
576
577 let series = counter.series(labels);
579 series.inc();
580 assert!(!series.is_evicted());
581
582 advance_cycle();
584 advance_cycle();
585 let removed = counter.evict_stale(1);
586
587 assert_eq!(removed, 0);
589 assert!(!series.is_evicted());
590 assert_eq!(counter.cardinality(), 1);
591 assert_eq!(counter.get(labels), 1);
592
593 series.inc();
595 assert_eq!(counter.get(labels), 2);
596 }
597
598 #[cfg(feature = "eviction")]
599 #[test]
600 fn test_series_evicted_after_handle_dropped() {
601 let counter = DynamicCounter::new(4);
602 let labels = &[("org_id", "handle_drop_test")];
603
604 {
606 let series = counter.series(labels);
607 series.inc();
608 }
609 assert_eq!(counter.cardinality(), 1);
612 assert_eq!(counter.get(labels), 1);
613
614 advance_cycle();
616 advance_cycle();
617
618 counter.inc(&[("flush", "cache")]);
620
621 let removed = counter.evict_stale(1);
623 assert_eq!(removed, 1);
624 assert_eq!(counter.get(labels), 0);
625 }
626
627 #[test]
628 fn test_overflow_bucket_routes_new_series_at_capacity() {
629 let counter = DynamicCounter::with_max_series(4, 2);
630
631 counter.inc(&[("org_id", "1")]);
632 counter.inc(&[("org_id", "2")]);
633 counter.inc(&[("org_id", "3")]);
634
635 assert_eq!(counter.cardinality(), 3);
636 assert_eq!(
637 counter.get(&[(OVERFLOW_LABEL_KEY, OVERFLOW_LABEL_VALUE)]),
638 1
639 );
640 }
641
642 #[test]
643 fn test_concurrent_cap_bounded_overshoot() {
644 use std::sync::{Arc, Barrier};
645 use std::thread;
646
647 let cap = 10;
648 let threads = 16;
649 let counter = Arc::new(DynamicCounter::with_max_series(4, cap));
650 let barrier = Arc::new(Barrier::new(threads));
651
652 let handles: Vec<_> = (0..threads)
653 .map(|t| {
654 let counter = Arc::clone(&counter);
655 let barrier = Arc::clone(&barrier);
656 thread::spawn(move || {
657 barrier.wait();
658 for i in 0..5 {
660 let label = format!("t{t}_s{i}");
661 counter.inc(&[("key", &label)]);
662 }
663 })
664 })
665 .collect();
666
667 for h in handles {
668 h.join().unwrap();
669 }
670
671 let card = counter.cardinality();
672 assert!(
675 card <= cap + threads + 1, "cardinality {card} exceeded bounded overshoot (cap={cap}, threads={threads})"
677 );
678 assert!(
680 counter.overflow_count() > 0,
681 "overflow should have triggered"
682 );
683 }
684
685 #[cfg(feature = "eviction")]
686 #[test]
687 fn test_eviction_and_reinsertion_bookkeeping() {
688 let counter = DynamicCounter::with_max_series(4, 3);
689
690 counter.inc(&[("k", "a")]);
691 counter.inc(&[("k", "b")]);
692 counter.inc(&[("k", "c")]);
693 assert_eq!(counter.cardinality(), 3);
694
695 counter.inc(&[("k", "d")]);
696 assert!(counter.overflow_count() > 0);
697 let card_after_overflow = counter.cardinality();
698 assert!(card_after_overflow <= 4);
699
700 advance_cycle();
701 advance_cycle();
702 advance_cycle();
703 counter.inc(&[("flush", "cache")]);
704 let evicted = counter.evict_stale(1);
705 assert!(evicted > 0);
706
707 let card_after_evict = counter.cardinality();
708 assert!(
709 card_after_evict < card_after_overflow,
710 "cardinality should decrease after eviction: before={card_after_overflow} after={card_after_evict}"
711 );
712
713 let overflow_before = counter.overflow_count();
714 counter.inc(&[("k", "new1")]);
715 counter.inc(&[("k", "new2")]);
716
717 assert!(counter.cardinality() <= 5);
718
719 let overflow_after = counter.overflow_count();
720 assert!(
721 overflow_after - overflow_before <= 1,
722 "unexpected overflow after eviction freed space"
723 );
724 }
725}