1use super::cache::{CacheableSeries, LabelCache, SERIES_CACHE_SIZE};
4#[cfg(feature = "eviction")]
5use super::current_cycle;
6use super::{COUNTER_IDS, DynamicLabelSet, thread_id};
7use crossbeam_utils::CachePadded;
8use parking_lot::RwLock;
9use std::cell::RefCell;
10use std::collections::HashMap;
11use std::hash::{Hash, Hasher};
12#[cfg(feature = "eviction")]
13use std::sync::atomic::AtomicU32;
14use std::sync::atomic::{AtomicBool, AtomicIsize, AtomicU64, AtomicUsize, Ordering};
15use std::sync::{Arc, Weak};
16
17const DEFAULT_MAX_SERIES: usize = 2000;
18const OVERFLOW_LABEL_KEY: &str = "__ft_overflow";
19const OVERFLOW_LABEL_VALUE: &str = "true";
20
21struct CounterSeries {
22 cells: Vec<CachePadded<AtomicIsize>>,
23 evicted: AtomicBool,
26 #[cfg(feature = "eviction")]
29 last_accessed_cycle: AtomicU32,
30}
31
32type CounterIndexShard = CachePadded<RwLock<HashMap<DynamicLabelSet, Arc<CounterSeries>>>>;
33
34impl CounterSeries {
35 #[cfg(feature = "eviction")]
36 fn new(shard_count: usize, current_cycle: u32) -> Self {
37 Self {
38 cells: (0..shard_count)
39 .map(|_| CachePadded::new(AtomicIsize::new(0)))
40 .collect(),
41 evicted: AtomicBool::new(false),
42 last_accessed_cycle: AtomicU32::new(current_cycle),
43 }
44 }
45
46 #[cfg(not(feature = "eviction"))]
47 fn new(shard_count: usize) -> Self {
48 Self {
49 cells: (0..shard_count)
50 .map(|_| CachePadded::new(AtomicIsize::new(0)))
51 .collect(),
52 evicted: AtomicBool::new(false),
53 }
54 }
55
56 #[inline]
57 fn add_at(&self, shard_idx: usize, value: isize) {
58 self.cells[shard_idx].fetch_add(value, Ordering::Relaxed);
59 }
62
63 #[cfg(feature = "eviction")]
65 #[inline]
66 fn touch(&self, cycle: u32) {
67 self.last_accessed_cycle.store(cycle, Ordering::Relaxed);
68 }
69
70 #[inline]
71 fn sum(&self) -> isize {
72 self.cells
73 .iter()
74 .map(|cell| cell.load(Ordering::Relaxed))
75 .sum()
76 }
77
78 #[inline]
79 fn is_evicted(&self) -> bool {
80 self.evicted.load(Ordering::Relaxed)
81 }
82
83 #[cfg(feature = "eviction")]
84 fn mark_evicted(&self) {
85 self.evicted.store(true, Ordering::Relaxed);
86 }
87}
88
89impl CacheableSeries for CounterSeries {
90 fn is_evicted(&self) -> bool {
91 self.is_evicted()
92 }
93}
94
95#[derive(Clone)]
101pub struct DynamicCounterSeries {
102 series: Arc<CounterSeries>,
103 shard_mask: usize,
104}
105
106impl DynamicCounterSeries {
107 #[inline]
109 pub fn inc(&self) {
110 self.add(1);
111 }
112
113 #[inline]
115 pub fn add(&self, value: isize) {
116 let shard_idx = thread_id() & self.shard_mask;
117 self.series.add_at(shard_idx, value);
118 }
119
120 #[inline]
122 pub fn get(&self) -> isize {
123 self.series.sum()
124 }
125
126 #[inline]
132 pub fn is_evicted(&self) -> bool {
133 self.series.is_evicted()
134 }
135}
136
137thread_local! {
138 static SERIES_CACHE: RefCell<LabelCache<Weak<CounterSeries>, SERIES_CACHE_SIZE>> =
139 RefCell::new(LabelCache::new());
140}
141
142pub struct DynamicCounter {
147 id: usize,
148 shard_count: usize,
149 max_series: usize,
150 shard_mask: usize,
151 index_shards: Vec<CounterIndexShard>,
152 series_count: AtomicUsize,
154 overflow_count: AtomicU64,
156}
157
158impl DynamicCounter {
159 pub fn new(shard_count: usize) -> Self {
161 Self::with_max_series(shard_count, DEFAULT_MAX_SERIES)
162 }
163
164 pub fn with_max_series(shard_count: usize, max_series: usize) -> Self {
172 let shard_count = shard_count.next_power_of_two();
173 let id = COUNTER_IDS.fetch_add(1, Ordering::Relaxed);
174 Self {
175 id,
176 shard_count,
177 max_series,
178 shard_mask: shard_count - 1,
179 index_shards: (0..shard_count)
180 .map(|_| CachePadded::new(RwLock::new(HashMap::new())))
181 .collect(),
182 series_count: AtomicUsize::new(0),
183 overflow_count: AtomicU64::new(0),
184 }
185 }
186
187 pub fn series(&self, labels: &[(&str, &str)]) -> DynamicCounterSeries {
191 if let Some(series) = self.cached_series(labels) {
192 return DynamicCounterSeries {
193 series,
194 shard_mask: self.shard_mask,
195 };
196 }
197 let series = self.lookup_or_create(labels);
198 self.update_cache(labels, &series);
199 DynamicCounterSeries {
200 series,
201 shard_mask: self.shard_mask,
202 }
203 }
204
205 #[inline]
207 pub fn inc(&self, labels: &[(&str, &str)]) {
208 self.add(labels, 1);
209 }
210
211 #[inline]
213 pub fn add(&self, labels: &[(&str, &str)], value: isize) {
214 if let Some(series) = self.cached_series(labels) {
215 let shard_idx = thread_id() & self.shard_mask;
216 series.add_at(shard_idx, value);
217 return;
218 }
219
220 let series = self.lookup_or_create(labels);
221 self.update_cache(labels, &series);
222 let shard_idx = thread_id() & self.shard_mask;
223 series.add_at(shard_idx, value);
224 }
225
226 pub fn get(&self, labels: &[(&str, &str)]) -> isize {
228 let key = DynamicLabelSet::from_pairs(labels);
229 let index_shard = self.index_shard_for(&key);
230 self.index_shards[index_shard]
231 .read()
232 .get(&key)
233 .map(|series| series.sum())
234 .unwrap_or(0)
235 }
236
237 pub fn sum_all(&self) -> isize {
239 self.snapshot().into_iter().map(|(_, value)| value).sum()
240 }
241
242 pub fn snapshot(&self) -> Vec<(DynamicLabelSet, isize)> {
244 let mut out = Vec::new();
245 for shard in &self.index_shards {
246 let guard = shard.read();
247 for (labels, series) in guard.iter() {
248 out.push((labels.clone(), series.sum()));
249 }
250 }
251 out
252 }
253
254 pub fn cardinality(&self) -> usize {
256 self.index_shards
257 .iter()
258 .map(|shard| shard.read().len())
259 .sum()
260 }
261
262 pub fn overflow_count(&self) -> u64 {
267 self.overflow_count.load(Ordering::Relaxed)
268 }
269
270 #[doc(hidden)]
275 pub fn visit_series(&self, mut f: impl FnMut(&[(String, String)], isize)) {
276 for shard in &self.index_shards {
277 let guard = shard.read();
278 for (labels, series) in guard.iter() {
279 f(labels.pairs(), series.sum());
280 }
281 }
282 }
283
284 #[cfg(feature = "eviction")]
295 pub fn evict_stale(&self, max_staleness: u32) -> usize {
296 let cycle = current_cycle();
297 let mut removed = 0;
298
299 for shard in &self.index_shards {
300 let mut guard = shard.write();
301 guard.retain(|_labels, series| {
302 if Arc::strong_count(series) > 1 {
305 return true;
306 }
307 let last = series.last_accessed_cycle.load(Ordering::Relaxed);
309 let stale = cycle.saturating_sub(last) > max_staleness;
310 if stale {
311 series.mark_evicted();
312 removed += 1;
313 self.series_count.fetch_sub(1, Ordering::Relaxed);
314 }
315 !stale
316 });
317 }
318
319 removed
320 }
321
322 fn lookup_or_create(&self, labels: &[(&str, &str)]) -> Arc<CounterSeries> {
323 let requested_key = DynamicLabelSet::from_pairs(labels);
324 let requested_shard = self.index_shard_for(&requested_key);
325 #[cfg(feature = "eviction")]
326 let cycle = current_cycle();
327
328 if let Some(series) = self.index_shards[requested_shard]
330 .read()
331 .get(&requested_key)
332 {
333 #[cfg(feature = "eviction")]
334 series.touch(cycle);
335 return Arc::clone(series);
336 }
337
338 let key = if self.max_series > 0
343 && self.series_count.load(Ordering::Relaxed) >= self.max_series
344 {
345 self.overflow_count.fetch_add(1, Ordering::Relaxed);
346 DynamicLabelSet::from_pairs(&[(OVERFLOW_LABEL_KEY, OVERFLOW_LABEL_VALUE)])
347 } else {
348 requested_key
349 };
350 let shard = self.index_shard_for(&key);
351
352 if let Some(series) = self.index_shards[shard].read().get(&key) {
354 #[cfg(feature = "eviction")]
355 series.touch(cycle);
356 return Arc::clone(series);
357 }
358
359 let mut guard = self.index_shards[shard].write();
360 if let Some(series) = guard.get(&key) {
361 #[cfg(feature = "eviction")]
362 series.touch(cycle);
363 return Arc::clone(series);
364 }
365 #[cfg(feature = "eviction")]
366 let series = Arc::new(CounterSeries::new(self.shard_count, cycle));
367 #[cfg(not(feature = "eviction"))]
368 let series = Arc::new(CounterSeries::new(self.shard_count));
369 guard.insert(key, Arc::clone(&series));
370 self.series_count.fetch_add(1, Ordering::Relaxed);
371 series
372 }
373
374 fn index_shard_for(&self, key: &DynamicLabelSet) -> usize {
375 let mut hasher = std::collections::hash_map::DefaultHasher::new();
376 key.hash(&mut hasher);
377 (hasher.finish() as usize) & self.shard_mask
378 }
379
380 fn cached_series(&self, labels: &[(&str, &str)]) -> Option<Arc<CounterSeries>> {
381 SERIES_CACHE.with(|cache| {
382 let series = cache.borrow_mut().get(self.id, labels)?;
383 #[cfg(feature = "eviction")]
384 series.touch(current_cycle());
385 Some(series)
386 })
387 }
388
389 fn update_cache(&self, labels: &[(&str, &str)], series: &Arc<CounterSeries>) {
390 SERIES_CACHE.with(|cache| {
391 cache
392 .borrow_mut()
393 .insert(self.id, labels, Arc::downgrade(series));
394 });
395 }
396}
397
398#[cfg(test)]
399mod tests {
400 #[cfg(feature = "eviction")]
401 use super::super::advance_cycle;
402 use super::*;
403
404 #[test]
405 fn test_basic_operations() {
406 let counter = DynamicCounter::new(4);
407 counter.inc(&[("org_id", "42"), ("endpoint_uuid", "abc")]);
408 counter.add(&[("org_id", "42"), ("endpoint_uuid", "abc")], 2);
409
410 assert_eq!(
411 counter.get(&[("org_id", "42"), ("endpoint_uuid", "abc")]),
412 3
413 );
414 assert_eq!(counter.sum_all(), 3);
415 }
416
417 #[test]
418 fn test_label_order_is_canonicalized() {
419 let counter = DynamicCounter::new(4);
420 counter.inc(&[("org_id", "42"), ("endpoint_uuid", "abc")]);
421
422 assert_eq!(
423 counter.get(&[("endpoint_uuid", "abc"), ("org_id", "42")]),
424 1
425 );
426 }
427
428 #[test]
429 fn test_series_handle() {
430 let counter = DynamicCounter::new(4);
431 let series = counter.series(&[("org_id", "42"), ("endpoint_uuid", "abc")]);
432 series.inc();
433 series.add(9);
434
435 assert_eq!(series.get(), 10);
436 assert_eq!(
437 counter.get(&[("org_id", "42"), ("endpoint_uuid", "abc")]),
438 10
439 );
440 }
441
442 #[test]
443 fn test_concurrent_adds() {
444 let counter = DynamicCounter::new(8);
445 let series = counter.series(&[("org_id", "42"), ("endpoint_uuid", "abc")]);
446
447 std::thread::scope(|s| {
448 for _ in 0..8 {
449 let series = series.clone();
450 s.spawn(move || {
451 for _ in 0..10_000 {
452 series.inc();
453 }
454 });
455 }
456 });
457
458 assert_eq!(
459 counter.get(&[("org_id", "42"), ("endpoint_uuid", "abc")]),
460 80_000
461 );
462 }
463
464 #[cfg(feature = "eviction")]
465 #[test]
466 fn test_evict_stale() {
467 let counter = DynamicCounter::new(4);
468 let labels = &[("org_id", "42")];
469
470 counter.inc(labels);
472 assert_eq!(counter.cardinality(), 1);
473 assert_eq!(counter.get(labels), 1);
474
475 advance_cycle();
477 advance_cycle();
478
479 counter.inc(&[("flush", "cache")]);
481
482 let removed = counter.evict_stale(1);
484 assert_eq!(removed, 1); assert_eq!(counter.cardinality(), 1); assert_eq!(counter.get(labels), 0);
489
490 counter.inc(labels);
492 assert_eq!(counter.cardinality(), 2);
493 assert_eq!(counter.get(labels), 1);
494 }
495
496 #[cfg(feature = "eviction")]
497 #[test]
498 fn test_evict_stale_keeps_active() {
499 let counter = DynamicCounter::new(4);
500 let active = &[("status", "active")];
501 let stale = &[("status", "stale")];
502
503 counter.inc(active);
505 counter.inc(stale);
506 assert_eq!(counter.cardinality(), 2);
507
508 advance_cycle();
510
511 counter.inc(active);
513
514 advance_cycle();
516
517 let removed = counter.evict_stale(1);
519 assert_eq!(removed, 1);
520 assert_eq!(counter.cardinality(), 1);
521 assert_eq!(counter.get(active), 2);
522 assert_eq!(counter.get(stale), 0);
523 }
524
525 #[cfg(feature = "eviction")]
526 #[test]
527 fn test_eviction_tombstone_invalidates_cache() {
528 let counter = DynamicCounter::new(4);
529 let labels = &[("org_id", "evict_test")];
530
531 counter.inc(labels);
533 counter.inc(labels); assert_eq!(counter.get(labels), 2);
535
536 advance_cycle();
538 advance_cycle();
539
540 counter.inc(&[("flush", "cache")]);
542
543 counter.evict_stale(1);
544
545 counter.inc(labels);
547 assert_eq!(counter.get(labels), 1); }
549
550 #[cfg(feature = "eviction")]
551 #[test]
552 fn test_series_handle_protects_from_eviction() {
553 let counter = DynamicCounter::new(4);
554 let labels = &[("org_id", "handle_test")];
555
556 let series = counter.series(labels);
558 series.inc();
559 assert!(!series.is_evicted());
560
561 advance_cycle();
563 advance_cycle();
564 let removed = counter.evict_stale(1);
565
566 assert_eq!(removed, 0);
568 assert!(!series.is_evicted());
569 assert_eq!(counter.cardinality(), 1);
570 assert_eq!(counter.get(labels), 1);
571
572 series.inc();
574 assert_eq!(counter.get(labels), 2);
575 }
576
577 #[cfg(feature = "eviction")]
578 #[test]
579 fn test_series_evicted_after_handle_dropped() {
580 let counter = DynamicCounter::new(4);
581 let labels = &[("org_id", "handle_drop_test")];
582
583 {
585 let series = counter.series(labels);
586 series.inc();
587 }
588 assert_eq!(counter.cardinality(), 1);
591 assert_eq!(counter.get(labels), 1);
592
593 advance_cycle();
595 advance_cycle();
596
597 counter.inc(&[("flush", "cache")]);
599
600 let removed = counter.evict_stale(1);
602 assert_eq!(removed, 1);
603 assert_eq!(counter.get(labels), 0);
604 }
605
606 #[test]
607 fn test_overflow_bucket_routes_new_series_at_capacity() {
608 let counter = DynamicCounter::with_max_series(4, 2);
609
610 counter.inc(&[("org_id", "1")]);
611 counter.inc(&[("org_id", "2")]);
612 counter.inc(&[("org_id", "3")]);
613
614 assert_eq!(counter.cardinality(), 3);
615 assert_eq!(
616 counter.get(&[(OVERFLOW_LABEL_KEY, OVERFLOW_LABEL_VALUE)]),
617 1
618 );
619 }
620
621 #[test]
622 fn test_concurrent_cap_bounded_overshoot() {
623 use std::sync::{Arc, Barrier};
624 use std::thread;
625
626 let cap = 10;
627 let threads = 16;
628 let counter = Arc::new(DynamicCounter::with_max_series(4, cap));
629 let barrier = Arc::new(Barrier::new(threads));
630
631 let handles: Vec<_> = (0..threads)
632 .map(|t| {
633 let counter = Arc::clone(&counter);
634 let barrier = Arc::clone(&barrier);
635 thread::spawn(move || {
636 barrier.wait();
637 for i in 0..5 {
639 let label = format!("t{t}_s{i}");
640 counter.inc(&[("key", &label)]);
641 }
642 })
643 })
644 .collect();
645
646 for h in handles {
647 h.join().unwrap();
648 }
649
650 let card = counter.cardinality();
651 assert!(
654 card <= cap + threads + 1, "cardinality {card} exceeded bounded overshoot (cap={cap}, threads={threads})"
656 );
657 assert!(
659 counter.overflow_count() > 0,
660 "overflow should have triggered"
661 );
662 }
663
664 #[cfg(feature = "eviction")]
665 #[test]
666 fn test_eviction_and_reinsertion_bookkeeping() {
667 let counter = DynamicCounter::with_max_series(4, 3);
668
669 counter.inc(&[("k", "a")]);
670 counter.inc(&[("k", "b")]);
671 counter.inc(&[("k", "c")]);
672 assert_eq!(counter.cardinality(), 3);
673
674 counter.inc(&[("k", "d")]);
675 assert!(counter.overflow_count() > 0);
676 let card_after_overflow = counter.cardinality();
677 assert!(card_after_overflow <= 4);
678
679 advance_cycle();
680 advance_cycle();
681 advance_cycle();
682 counter.inc(&[("flush", "cache")]);
683 let evicted = counter.evict_stale(1);
684 assert!(evicted > 0);
685
686 let card_after_evict = counter.cardinality();
687 assert!(
688 card_after_evict < card_after_overflow,
689 "cardinality should decrease after eviction: before={card_after_overflow} after={card_after_evict}"
690 );
691
692 let overflow_before = counter.overflow_count();
693 counter.inc(&[("k", "new1")]);
694 counter.inc(&[("k", "new2")]);
695
696 assert!(counter.cardinality() <= 5);
697
698 let overflow_after = counter.overflow_count();
699 assert!(
700 overflow_after - overflow_before <= 1,
701 "unexpected overflow after eviction freed space"
702 );
703 }
704}