1use super::cache::{CacheableSeries, LabelCache, SERIES_CACHE_SIZE};
4#[cfg(feature = "eviction")]
5use super::current_cycle;
6use super::{COUNTER_IDS, DynamicLabelSet, thread_id};
7use crossbeam_utils::CachePadded;
8use parking_lot::RwLock;
9use std::cell::RefCell;
10use std::collections::HashMap;
11use std::hash::{Hash, Hasher};
12#[cfg(feature = "eviction")]
13use std::sync::atomic::AtomicU32;
14use std::sync::atomic::{AtomicBool, AtomicIsize, AtomicU64, AtomicUsize, Ordering};
15use std::sync::{Arc, Weak};
16
17const DEFAULT_MAX_SERIES: usize = 2000;
18const OVERFLOW_LABEL_KEY: &str = "__ft_overflow";
19const OVERFLOW_LABEL_VALUE: &str = "true";
20
21struct CounterSeries {
22 cells: Vec<CachePadded<AtomicIsize>>,
23 evicted: AtomicBool,
26 #[cfg(feature = "eviction")]
29 last_accessed_cycle: AtomicU32,
30}
31
32type CounterIndexShard = CachePadded<RwLock<HashMap<DynamicLabelSet, Arc<CounterSeries>>>>;
33
34impl CounterSeries {
35 #[cfg(feature = "eviction")]
36 fn new(shard_count: usize, current_cycle: u32) -> Self {
37 Self {
38 cells: (0..shard_count)
39 .map(|_| CachePadded::new(AtomicIsize::new(0)))
40 .collect(),
41 evicted: AtomicBool::new(false),
42 last_accessed_cycle: AtomicU32::new(current_cycle),
43 }
44 }
45
46 #[cfg(not(feature = "eviction"))]
47 fn new(shard_count: usize) -> Self {
48 Self {
49 cells: (0..shard_count)
50 .map(|_| CachePadded::new(AtomicIsize::new(0)))
51 .collect(),
52 evicted: AtomicBool::new(false),
53 }
54 }
55
56 #[inline]
57 fn add_at(&self, shard_idx: usize, value: isize) {
58 self.cells[shard_idx].fetch_add(value, Ordering::Relaxed);
59 }
62
63 #[cfg(feature = "eviction")]
65 #[inline]
66 fn touch(&self, cycle: u32) {
67 self.last_accessed_cycle.store(cycle, Ordering::Relaxed);
68 }
69
70 #[inline]
71 fn sum(&self) -> isize {
72 self.cells
73 .iter()
74 .map(|cell| cell.load(Ordering::Relaxed))
75 .sum()
76 }
77
78 #[inline]
79 fn is_evicted(&self) -> bool {
80 self.evicted.load(Ordering::Relaxed)
81 }
82
83 #[cfg(feature = "eviction")]
84 fn mark_evicted(&self) {
85 self.evicted.store(true, Ordering::Relaxed);
86 }
87}
88
89impl CacheableSeries for CounterSeries {
90 fn is_evicted(&self) -> bool {
91 self.is_evicted()
92 }
93}
94
95#[derive(Clone)]
101pub struct DynamicCounterSeries {
102 series: Arc<CounterSeries>,
103 shard_mask: usize,
104}
105
106impl DynamicCounterSeries {
107 #[inline]
109 pub fn inc(&self) {
110 self.add(1);
111 }
112
113 #[inline]
115 pub fn add(&self, value: isize) {
116 let shard_idx = thread_id() & self.shard_mask;
117 self.series.add_at(shard_idx, value);
118 }
119
120 #[inline]
122 pub fn get(&self) -> isize {
123 self.series.sum()
124 }
125
126 #[inline]
132 pub fn is_evicted(&self) -> bool {
133 self.series.is_evicted()
134 }
135}
136
137thread_local! {
138 static SERIES_CACHE: RefCell<LabelCache<Weak<CounterSeries>, SERIES_CACHE_SIZE>> =
139 RefCell::new(LabelCache::new());
140}
141
142pub struct DynamicCounter {
147 id: usize,
148 shard_count: usize,
149 max_series: usize,
150 shard_mask: usize,
151 index_shards: Vec<CounterIndexShard>,
152 series_count: AtomicUsize,
154 overflow_count: AtomicU64,
156}
157
158impl DynamicCounter {
159 pub fn new(shard_count: usize) -> Self {
161 Self::with_max_series(shard_count, DEFAULT_MAX_SERIES)
162 }
163
164 pub fn with_max_series(shard_count: usize, max_series: usize) -> Self {
172 let shard_count = shard_count.next_power_of_two();
173 let id = COUNTER_IDS.fetch_add(1, Ordering::Relaxed);
174 Self {
175 id,
176 shard_count,
177 max_series,
178 shard_mask: shard_count - 1,
179 index_shards: (0..shard_count)
180 .map(|_| CachePadded::new(RwLock::new(HashMap::new())))
181 .collect(),
182 series_count: AtomicUsize::new(0),
183 overflow_count: AtomicU64::new(0),
184 }
185 }
186
187 pub fn series(&self, labels: &[(&str, &str)]) -> DynamicCounterSeries {
191 if let Some(series) = self.cached_series(labels) {
192 return DynamicCounterSeries {
193 series,
194 shard_mask: self.shard_mask,
195 };
196 }
197 let series = self.lookup_or_create(labels);
198 self.update_cache(labels, &series);
199 DynamicCounterSeries {
200 series,
201 shard_mask: self.shard_mask,
202 }
203 }
204
205 #[inline]
207 pub fn inc(&self, labels: &[(&str, &str)]) {
208 self.add(labels, 1);
209 }
210
211 #[inline]
213 pub fn add(&self, labels: &[(&str, &str)], value: isize) {
214 if let Some(series) = self.cached_series(labels) {
215 let shard_idx = thread_id() & self.shard_mask;
216 series.add_at(shard_idx, value);
217 return;
218 }
219
220 let series = self.lookup_or_create(labels);
221 self.update_cache(labels, &series);
222 let shard_idx = thread_id() & self.shard_mask;
223 series.add_at(shard_idx, value);
224 }
225
226 pub fn get(&self, labels: &[(&str, &str)]) -> isize {
228 let key = DynamicLabelSet::from_pairs(labels);
229 let index_shard = self.index_shard_for(&key);
230 self.index_shards[index_shard]
231 .read()
232 .get(&key)
233 .map(|series| series.sum())
234 .unwrap_or(0)
235 }
236
237 pub fn sum_all(&self) -> isize {
239 self.snapshot().into_iter().map(|(_, value)| value).sum()
240 }
241
242 pub fn snapshot(&self) -> Vec<(DynamicLabelSet, isize)> {
244 let mut out = Vec::new();
245 for shard in &self.index_shards {
246 let guard = shard.read();
247 for (labels, series) in guard.iter() {
248 out.push((labels.clone(), series.sum()));
249 }
250 }
251 out
252 }
253
254 pub fn cardinality(&self) -> usize {
256 self.index_shards
257 .iter()
258 .map(|shard| shard.read().len())
259 .sum()
260 }
261
262 pub fn overflow_count(&self) -> u64 {
267 self.overflow_count.load(Ordering::Relaxed)
268 }
269
270 #[doc(hidden)]
277 pub fn visit_series(&self, mut f: impl FnMut(&[(String, String)], isize)) {
278 for shard in &self.index_shards {
279 let guard = shard.read();
280 for (labels, series) in guard.iter() {
281 f(labels.pairs(), series.sum());
282 }
283 }
284 }
285
286 #[cfg(feature = "eviction")]
297 pub fn evict_stale(&self, max_staleness: u32) -> usize {
298 let cycle = current_cycle();
299 let mut removed = 0;
300
301 for shard in &self.index_shards {
302 let mut guard = shard.write();
303 guard.retain(|_labels, series| {
304 if Arc::strong_count(series) > 1 {
307 return true;
308 }
309 let last = series.last_accessed_cycle.load(Ordering::Relaxed);
311 let stale = cycle.saturating_sub(last) > max_staleness;
312 if stale {
313 series.mark_evicted();
314 removed += 1;
315 self.series_count.fetch_sub(1, Ordering::Relaxed);
316 }
317 !stale
318 });
319 }
320
321 removed
322 }
323
324 fn lookup_or_create(&self, labels: &[(&str, &str)]) -> Arc<CounterSeries> {
325 let requested_key = DynamicLabelSet::from_pairs(labels);
326 let requested_shard = self.index_shard_for(&requested_key);
327 #[cfg(feature = "eviction")]
328 let cycle = current_cycle();
329
330 if let Some(series) = self.index_shards[requested_shard]
332 .read()
333 .get(&requested_key)
334 {
335 #[cfg(feature = "eviction")]
336 series.touch(cycle);
337 return Arc::clone(series);
338 }
339
340 let key = if self.max_series > 0
345 && self.series_count.load(Ordering::Relaxed) >= self.max_series
346 {
347 self.overflow_count.fetch_add(1, Ordering::Relaxed);
348 DynamicLabelSet::from_pairs(&[(OVERFLOW_LABEL_KEY, OVERFLOW_LABEL_VALUE)])
349 } else {
350 requested_key
351 };
352 let shard = self.index_shard_for(&key);
353
354 if let Some(series) = self.index_shards[shard].read().get(&key) {
356 #[cfg(feature = "eviction")]
357 series.touch(cycle);
358 return Arc::clone(series);
359 }
360
361 let mut guard = self.index_shards[shard].write();
362 if let Some(series) = guard.get(&key) {
363 #[cfg(feature = "eviction")]
364 series.touch(cycle);
365 return Arc::clone(series);
366 }
367 #[cfg(feature = "eviction")]
368 let series = Arc::new(CounterSeries::new(self.shard_count, cycle));
369 #[cfg(not(feature = "eviction"))]
370 let series = Arc::new(CounterSeries::new(self.shard_count));
371 guard.insert(key, Arc::clone(&series));
372 self.series_count.fetch_add(1, Ordering::Relaxed);
373 series
374 }
375
376 fn index_shard_for(&self, key: &DynamicLabelSet) -> usize {
377 let mut hasher = std::collections::hash_map::DefaultHasher::new();
378 key.hash(&mut hasher);
379 (hasher.finish() as usize) & self.shard_mask
380 }
381
382 fn cached_series(&self, labels: &[(&str, &str)]) -> Option<Arc<CounterSeries>> {
383 SERIES_CACHE.with(|cache| {
384 let series = cache.borrow_mut().get(self.id, labels)?;
385 #[cfg(feature = "eviction")]
386 series.touch(current_cycle());
387 Some(series)
388 })
389 }
390
391 fn update_cache(&self, labels: &[(&str, &str)], series: &Arc<CounterSeries>) {
392 SERIES_CACHE.with(|cache| {
393 cache
394 .borrow_mut()
395 .insert(self.id, labels, Arc::downgrade(series));
396 });
397 }
398}
399
400#[cfg(test)]
401mod tests {
402 #[cfg(feature = "eviction")]
403 use super::super::advance_cycle;
404 use super::*;
405
406 #[test]
407 fn test_basic_operations() {
408 let counter = DynamicCounter::new(4);
409 counter.inc(&[("org_id", "42"), ("endpoint_uuid", "abc")]);
410 counter.add(&[("org_id", "42"), ("endpoint_uuid", "abc")], 2);
411
412 assert_eq!(
413 counter.get(&[("org_id", "42"), ("endpoint_uuid", "abc")]),
414 3
415 );
416 assert_eq!(counter.sum_all(), 3);
417 }
418
419 #[test]
420 fn test_label_order_is_canonicalized() {
421 let counter = DynamicCounter::new(4);
422 counter.inc(&[("org_id", "42"), ("endpoint_uuid", "abc")]);
423
424 assert_eq!(
425 counter.get(&[("endpoint_uuid", "abc"), ("org_id", "42")]),
426 1
427 );
428 }
429
430 #[test]
431 fn test_series_handle() {
432 let counter = DynamicCounter::new(4);
433 let series = counter.series(&[("org_id", "42"), ("endpoint_uuid", "abc")]);
434 series.inc();
435 series.add(9);
436
437 assert_eq!(series.get(), 10);
438 assert_eq!(
439 counter.get(&[("org_id", "42"), ("endpoint_uuid", "abc")]),
440 10
441 );
442 }
443
444 #[test]
445 fn test_concurrent_adds() {
446 let counter = DynamicCounter::new(8);
447 let series = counter.series(&[("org_id", "42"), ("endpoint_uuid", "abc")]);
448
449 std::thread::scope(|s| {
450 for _ in 0..8 {
451 let series = series.clone();
452 s.spawn(move || {
453 for _ in 0..10_000 {
454 series.inc();
455 }
456 });
457 }
458 });
459
460 assert_eq!(
461 counter.get(&[("org_id", "42"), ("endpoint_uuid", "abc")]),
462 80_000
463 );
464 }
465
466 #[cfg(feature = "eviction")]
467 #[test]
468 fn test_evict_stale() {
469 let counter = DynamicCounter::new(4);
470 let labels = &[("org_id", "42")];
471
472 counter.inc(labels);
474 assert_eq!(counter.cardinality(), 1);
475 assert_eq!(counter.get(labels), 1);
476
477 advance_cycle();
479 advance_cycle();
480
481 counter.inc(&[("flush", "cache")]);
483
484 let removed = counter.evict_stale(1);
486 assert_eq!(removed, 1); assert_eq!(counter.cardinality(), 1); assert_eq!(counter.get(labels), 0);
491
492 counter.inc(labels);
494 assert_eq!(counter.cardinality(), 2);
495 assert_eq!(counter.get(labels), 1);
496 }
497
498 #[cfg(feature = "eviction")]
499 #[test]
500 fn test_evict_stale_keeps_active() {
501 let counter = DynamicCounter::new(4);
502 let active = &[("status", "active")];
503 let stale = &[("status", "stale")];
504
505 counter.inc(active);
507 counter.inc(stale);
508 assert_eq!(counter.cardinality(), 2);
509
510 advance_cycle();
512
513 counter.inc(active);
515
516 advance_cycle();
518
519 let removed = counter.evict_stale(1);
521 assert_eq!(removed, 1);
522 assert_eq!(counter.cardinality(), 1);
523 assert_eq!(counter.get(active), 2);
524 assert_eq!(counter.get(stale), 0);
525 }
526
527 #[cfg(feature = "eviction")]
528 #[test]
529 fn test_eviction_tombstone_invalidates_cache() {
530 let counter = DynamicCounter::new(4);
531 let labels = &[("org_id", "evict_test")];
532
533 counter.inc(labels);
535 counter.inc(labels); assert_eq!(counter.get(labels), 2);
537
538 advance_cycle();
540 advance_cycle();
541
542 counter.inc(&[("flush", "cache")]);
544
545 counter.evict_stale(1);
546
547 counter.inc(labels);
549 assert_eq!(counter.get(labels), 1); }
551
552 #[cfg(feature = "eviction")]
553 #[test]
554 fn test_series_handle_protects_from_eviction() {
555 let counter = DynamicCounter::new(4);
556 let labels = &[("org_id", "handle_test")];
557
558 let series = counter.series(labels);
560 series.inc();
561 assert!(!series.is_evicted());
562
563 advance_cycle();
565 advance_cycle();
566 let removed = counter.evict_stale(1);
567
568 assert_eq!(removed, 0);
570 assert!(!series.is_evicted());
571 assert_eq!(counter.cardinality(), 1);
572 assert_eq!(counter.get(labels), 1);
573
574 series.inc();
576 assert_eq!(counter.get(labels), 2);
577 }
578
579 #[cfg(feature = "eviction")]
580 #[test]
581 fn test_series_evicted_after_handle_dropped() {
582 let counter = DynamicCounter::new(4);
583 let labels = &[("org_id", "handle_drop_test")];
584
585 {
587 let series = counter.series(labels);
588 series.inc();
589 }
590 assert_eq!(counter.cardinality(), 1);
593 assert_eq!(counter.get(labels), 1);
594
595 advance_cycle();
597 advance_cycle();
598
599 counter.inc(&[("flush", "cache")]);
601
602 let removed = counter.evict_stale(1);
604 assert_eq!(removed, 1);
605 assert_eq!(counter.get(labels), 0);
606 }
607
608 #[test]
609 fn test_overflow_bucket_routes_new_series_at_capacity() {
610 let counter = DynamicCounter::with_max_series(4, 2);
611
612 counter.inc(&[("org_id", "1")]);
613 counter.inc(&[("org_id", "2")]);
614 counter.inc(&[("org_id", "3")]);
615
616 assert_eq!(counter.cardinality(), 3);
617 assert_eq!(
618 counter.get(&[(OVERFLOW_LABEL_KEY, OVERFLOW_LABEL_VALUE)]),
619 1
620 );
621 }
622
623 #[test]
624 fn test_concurrent_cap_bounded_overshoot() {
625 use std::sync::{Arc, Barrier};
626 use std::thread;
627
628 let cap = 10;
629 let threads = 16;
630 let counter = Arc::new(DynamicCounter::with_max_series(4, cap));
631 let barrier = Arc::new(Barrier::new(threads));
632
633 let handles: Vec<_> = (0..threads)
634 .map(|t| {
635 let counter = Arc::clone(&counter);
636 let barrier = Arc::clone(&barrier);
637 thread::spawn(move || {
638 barrier.wait();
639 for i in 0..5 {
641 let label = format!("t{t}_s{i}");
642 counter.inc(&[("key", &label)]);
643 }
644 })
645 })
646 .collect();
647
648 for h in handles {
649 h.join().unwrap();
650 }
651
652 let card = counter.cardinality();
653 assert!(
656 card <= cap + threads + 1, "cardinality {card} exceeded bounded overshoot (cap={cap}, threads={threads})"
658 );
659 assert!(
661 counter.overflow_count() > 0,
662 "overflow should have triggered"
663 );
664 }
665
666 #[cfg(feature = "eviction")]
667 #[test]
668 fn test_eviction_and_reinsertion_bookkeeping() {
669 let counter = DynamicCounter::with_max_series(4, 3);
670
671 counter.inc(&[("k", "a")]);
672 counter.inc(&[("k", "b")]);
673 counter.inc(&[("k", "c")]);
674 assert_eq!(counter.cardinality(), 3);
675
676 counter.inc(&[("k", "d")]);
677 assert!(counter.overflow_count() > 0);
678 let card_after_overflow = counter.cardinality();
679 assert!(card_after_overflow <= 4);
680
681 advance_cycle();
682 advance_cycle();
683 advance_cycle();
684 counter.inc(&[("flush", "cache")]);
685 let evicted = counter.evict_stale(1);
686 assert!(evicted > 0);
687
688 let card_after_evict = counter.cardinality();
689 assert!(
690 card_after_evict < card_after_overflow,
691 "cardinality should decrease after eviction: before={card_after_overflow} after={card_after_evict}"
692 );
693
694 let overflow_before = counter.overflow_count();
695 counter.inc(&[("k", "new1")]);
696 counter.inc(&[("k", "new2")]);
697
698 assert!(counter.cardinality() <= 5);
699
700 let overflow_after = counter.overflow_count();
701 assert!(
702 overflow_after - overflow_before <= 1,
703 "unexpected overflow after eviction freed space"
704 );
705 }
706}