fast_telemetry/metric/dynamic/
gauge_i64.rs1use super::cache::{CacheableSeries, LabelCache, SERIES_CACHE_SIZE};
8#[cfg(feature = "eviction")]
9use super::current_cycle;
10use super::{DynamicLabelSet, thread_id};
11use crossbeam_utils::CachePadded;
12use parking_lot::RwLock;
13use std::cell::RefCell;
14use std::collections::HashMap;
15use std::hash::{Hash, Hasher};
16#[cfg(feature = "eviction")]
17use std::sync::atomic::AtomicU32;
18use std::sync::atomic::{AtomicBool, AtomicI64, AtomicU64, AtomicUsize, Ordering};
19use std::sync::{Arc, Weak};
20
21static GAUGE_I64_IDS: AtomicUsize = AtomicUsize::new(1);
22const DEFAULT_MAX_SERIES: usize = 2000;
23const OVERFLOW_LABEL_KEY: &str = "__ft_overflow";
24const OVERFLOW_LABEL_VALUE: &str = "true";
25type GaugeI64IndexShard = CachePadded<RwLock<HashMap<DynamicLabelSet, Arc<GaugeI64Series>>>>;
26
27struct GaugeI64Series {
28 cells: Vec<CachePadded<AtomicI64>>,
29 evicted: AtomicBool,
31 #[cfg(feature = "eviction")]
33 last_accessed_cycle: AtomicU32,
34}
35
36impl GaugeI64Series {
37 #[cfg(feature = "eviction")]
38 fn new(shard_count: usize, cycle: u32) -> Self {
39 Self {
40 cells: (0..shard_count)
41 .map(|_| CachePadded::new(AtomicI64::new(0)))
42 .collect(),
43 evicted: AtomicBool::new(false),
44 last_accessed_cycle: AtomicU32::new(cycle),
45 }
46 }
47
48 #[cfg(not(feature = "eviction"))]
49 fn new(shard_count: usize) -> Self {
50 Self {
51 cells: (0..shard_count)
52 .map(|_| CachePadded::new(AtomicI64::new(0)))
53 .collect(),
54 evicted: AtomicBool::new(false),
55 }
56 }
57
58 #[inline]
59 fn add_at(&self, shard_idx: usize, value: i64) {
60 self.cells[shard_idx].fetch_add(value, Ordering::Relaxed);
61 }
64
65 #[inline]
66 fn set_at(&self, shard_idx: usize, value: i64) {
67 for (i, cell) in self.cells.iter().enumerate() {
70 if i == shard_idx {
71 cell.store(value, Ordering::Relaxed);
72 } else {
73 cell.store(0, Ordering::Relaxed);
74 }
75 }
76 }
79
80 #[cfg(feature = "eviction")]
82 #[inline]
83 fn touch(&self, cycle: u32) {
84 self.last_accessed_cycle.store(cycle, Ordering::Relaxed);
85 }
86
87 #[inline]
88 fn sum(&self) -> i64 {
89 self.cells
90 .iter()
91 .map(|cell| cell.load(Ordering::Relaxed))
92 .sum()
93 }
94
95 #[inline]
96 fn is_evicted(&self) -> bool {
97 self.evicted.load(Ordering::Relaxed)
98 }
99
100 #[cfg(feature = "eviction")]
101 fn mark_evicted(&self) {
102 self.evicted.store(true, Ordering::Relaxed);
103 }
104}
105
106impl CacheableSeries for GaugeI64Series {
107 fn is_evicted(&self) -> bool {
108 self.is_evicted()
109 }
110}
111
112#[derive(Clone)]
118pub struct DynamicGaugeI64Series {
119 series: Arc<GaugeI64Series>,
120 shard_mask: usize,
121}
122
123impl DynamicGaugeI64Series {
124 #[inline]
126 pub fn inc(&self) {
127 self.add(1);
128 }
129
130 #[inline]
132 pub fn dec(&self) {
133 self.add(-1);
134 }
135
136 #[inline]
138 pub fn add(&self, value: i64) {
139 let shard_idx = thread_id() & self.shard_mask;
140 self.series.add_at(shard_idx, value);
141 }
142
143 #[inline]
145 pub fn set(&self, value: i64) {
146 let shard_idx = thread_id() & self.shard_mask;
147 self.series.set_at(shard_idx, value);
148 }
149
150 #[inline]
152 pub fn get(&self) -> i64 {
153 self.series.sum()
154 }
155
156 #[inline]
158 pub fn is_evicted(&self) -> bool {
159 self.series.is_evicted()
160 }
161}
162
163thread_local! {
164 static SERIES_CACHE: RefCell<LabelCache<Weak<GaugeI64Series>, SERIES_CACHE_SIZE>> =
165 RefCell::new(LabelCache::new());
166}
167
168pub struct DynamicGaugeI64 {
175 id: usize,
176 shard_count: usize,
177 max_series: usize,
178 shard_mask: usize,
179 index_shards: Vec<GaugeI64IndexShard>,
180 series_count: AtomicUsize,
182 overflow_count: AtomicU64,
184}
185
186impl DynamicGaugeI64 {
187 pub fn new(shard_count: usize) -> Self {
189 Self::with_max_series(shard_count, DEFAULT_MAX_SERIES)
190 }
191
192 pub fn with_max_series(shard_count: usize, max_series: usize) -> Self {
200 let shard_count = shard_count.next_power_of_two();
201 let id = GAUGE_I64_IDS.fetch_add(1, Ordering::Relaxed);
202 Self {
203 id,
204 shard_count,
205 max_series,
206 shard_mask: shard_count - 1,
207 index_shards: (0..shard_count)
208 .map(|_| CachePadded::new(RwLock::new(HashMap::new())))
209 .collect(),
210 series_count: AtomicUsize::new(0),
211 overflow_count: AtomicU64::new(0),
212 }
213 }
214
215 pub fn series(&self, labels: &[(&str, &str)]) -> DynamicGaugeI64Series {
217 if let Some(series) = self.cached_series(labels) {
218 return DynamicGaugeI64Series {
219 series,
220 shard_mask: self.shard_mask,
221 };
222 }
223 let series = self.lookup_or_create(labels);
224 self.update_cache(labels, &series);
225 DynamicGaugeI64Series {
226 series,
227 shard_mask: self.shard_mask,
228 }
229 }
230
231 #[inline]
233 pub fn inc(&self, labels: &[(&str, &str)]) {
234 self.add(labels, 1);
235 }
236
237 #[inline]
239 pub fn dec(&self, labels: &[(&str, &str)]) {
240 self.add(labels, -1);
241 }
242
243 #[inline]
245 pub fn add(&self, labels: &[(&str, &str)], value: i64) {
246 if let Some(series) = self.cached_series(labels) {
247 let shard_idx = thread_id() & self.shard_mask;
248 series.add_at(shard_idx, value);
249 return;
250 }
251
252 let series = self.lookup_or_create(labels);
253 self.update_cache(labels, &series);
254 let shard_idx = thread_id() & self.shard_mask;
255 series.add_at(shard_idx, value);
256 }
257
258 #[inline]
260 pub fn set(&self, labels: &[(&str, &str)], value: i64) {
261 if let Some(series) = self.cached_series(labels) {
262 let shard_idx = thread_id() & self.shard_mask;
263 series.set_at(shard_idx, value);
264 return;
265 }
266
267 let series = self.lookup_or_create(labels);
268 self.update_cache(labels, &series);
269 let shard_idx = thread_id() & self.shard_mask;
270 series.set_at(shard_idx, value);
271 }
272
273 pub fn get(&self, labels: &[(&str, &str)]) -> i64 {
275 let key = DynamicLabelSet::from_pairs(labels);
276 let index_shard = self.index_shard_for(&key);
277 self.index_shards[index_shard]
278 .read()
279 .get(&key)
280 .map(|series| series.sum())
281 .unwrap_or(0)
282 }
283
284 pub fn sum_all(&self) -> i64 {
286 self.snapshot().into_iter().map(|(_, value)| value).sum()
287 }
288
289 pub fn snapshot(&self) -> Vec<(DynamicLabelSet, i64)> {
291 let mut out = Vec::new();
292 for shard in &self.index_shards {
293 let guard = shard.read();
294 for (labels, series) in guard.iter() {
295 out.push((labels.clone(), series.sum()));
296 }
297 }
298 out
299 }
300
301 pub fn cardinality(&self) -> usize {
303 self.index_shards
304 .iter()
305 .map(|shard| shard.read().len())
306 .sum()
307 }
308
309 pub fn overflow_count(&self) -> u64 {
314 self.overflow_count.load(Ordering::Relaxed)
315 }
316
317 pub(crate) fn visit_series(&self, mut f: impl FnMut(&[(String, String)], i64)) {
322 for shard in &self.index_shards {
323 let guard = shard.read();
324 for (labels, series) in guard.iter() {
325 f(labels.pairs(), series.sum());
326 }
327 }
328 }
329
330 #[cfg(feature = "eviction")]
341 pub fn evict_stale(&self, max_staleness: u32) -> usize {
342 let cycle = current_cycle();
343 let mut removed = 0;
344
345 for shard in &self.index_shards {
346 let mut guard = shard.write();
347 guard.retain(|_labels, series| {
348 if Arc::strong_count(series) > 1 {
351 return true;
352 }
353 let last = series.last_accessed_cycle.load(Ordering::Relaxed);
355 let stale = cycle.saturating_sub(last) > max_staleness;
356 if stale {
357 series.mark_evicted();
358 removed += 1;
359 self.series_count.fetch_sub(1, Ordering::Relaxed);
360 }
361 !stale
362 });
363 }
364
365 removed
366 }
367
368 fn lookup_or_create(&self, labels: &[(&str, &str)]) -> Arc<GaugeI64Series> {
369 let requested_key = DynamicLabelSet::from_pairs(labels);
370 let requested_shard = self.index_shard_for(&requested_key);
371 #[cfg(feature = "eviction")]
372 let cycle = current_cycle();
373
374 if let Some(series) = self.index_shards[requested_shard]
376 .read()
377 .get(&requested_key)
378 {
379 #[cfg(feature = "eviction")]
380 series.touch(cycle);
381 return Arc::clone(series);
382 }
383
384 let key = if self.max_series > 0
386 && self.series_count.load(Ordering::Relaxed) >= self.max_series
387 {
388 self.overflow_count.fetch_add(1, Ordering::Relaxed);
389 DynamicLabelSet::from_pairs(&[(OVERFLOW_LABEL_KEY, OVERFLOW_LABEL_VALUE)])
390 } else {
391 requested_key
392 };
393 let shard = self.index_shard_for(&key);
394
395 if let Some(series) = self.index_shards[shard].read().get(&key) {
396 #[cfg(feature = "eviction")]
397 series.touch(cycle);
398 return Arc::clone(series);
399 }
400
401 let mut guard = self.index_shards[shard].write();
402 if let Some(series) = guard.get(&key) {
403 #[cfg(feature = "eviction")]
404 series.touch(cycle);
405 return Arc::clone(series);
406 }
407 #[cfg(feature = "eviction")]
408 let series = Arc::new(GaugeI64Series::new(self.shard_count, cycle));
409 #[cfg(not(feature = "eviction"))]
410 let series = Arc::new(GaugeI64Series::new(self.shard_count));
411 guard.insert(key, Arc::clone(&series));
412 self.series_count.fetch_add(1, Ordering::Relaxed);
413 series
414 }
415
416 fn index_shard_for(&self, key: &DynamicLabelSet) -> usize {
417 let mut hasher = std::collections::hash_map::DefaultHasher::new();
418 key.hash(&mut hasher);
419 (hasher.finish() as usize) & self.shard_mask
420 }
421
422 fn cached_series(&self, labels: &[(&str, &str)]) -> Option<Arc<GaugeI64Series>> {
423 SERIES_CACHE.with(|cache| {
424 let series = cache.borrow_mut().get(self.id, labels)?;
425 #[cfg(feature = "eviction")]
426 series.touch(current_cycle());
427 Some(series)
428 })
429 }
430
431 fn update_cache(&self, labels: &[(&str, &str)], series: &Arc<GaugeI64Series>) {
432 SERIES_CACHE.with(|cache| {
433 cache
434 .borrow_mut()
435 .insert(self.id, labels, Arc::downgrade(series));
436 });
437 }
438}
439
440#[cfg(test)]
441mod tests {
442 #[cfg(feature = "eviction")]
443 use super::super::advance_cycle;
444 use super::*;
445
446 #[test]
447 fn test_basic_operations() {
448 let gauge = DynamicGaugeI64::new(4);
449 gauge.inc(&[("endpoint_id", "ep1")]);
450 gauge.add(&[("endpoint_id", "ep1")], 2);
451
452 assert_eq!(gauge.get(&[("endpoint_id", "ep1")]), 3);
453
454 gauge.dec(&[("endpoint_id", "ep1")]);
455 assert_eq!(gauge.get(&[("endpoint_id", "ep1")]), 2);
456
457 gauge.add(&[("endpoint_id", "ep1")], -2);
458 assert_eq!(gauge.get(&[("endpoint_id", "ep1")]), 0);
459 }
460
461 #[test]
462 fn test_series_handle() {
463 let gauge = DynamicGaugeI64::new(4);
464 let series = gauge.series(&[("endpoint_id", "ep1")]);
465 series.inc();
466 series.inc();
467 series.dec();
468
469 assert_eq!(series.get(), 1);
470 assert_eq!(gauge.get(&[("endpoint_id", "ep1")]), 1);
471 }
472
473 #[test]
474 fn test_snapshot() {
475 let gauge = DynamicGaugeI64::new(4);
476 gauge.add(&[("endpoint_id", "ep1")], 10);
477 gauge.add(&[("endpoint_id", "ep2")], 20);
478
479 let snap = gauge.snapshot();
480 assert_eq!(snap.len(), 2);
481
482 let total: i64 = snap.iter().map(|(_, v)| v).sum();
483 assert_eq!(total, 30);
484 }
485
486 #[cfg(feature = "eviction")]
487 #[test]
488 fn test_evict_stale() {
489 let gauge = DynamicGaugeI64::new(4);
490 let labels = &[("endpoint_id", "evict_i64")];
491
492 gauge.add(labels, 5);
493 assert_eq!(gauge.cardinality(), 1);
494
495 advance_cycle();
497 advance_cycle();
498
499 gauge.add(&[("flush", "cache")], 1);
501
502 let removed = gauge.evict_stale(1);
503 assert_eq!(removed, 1);
504 assert_eq!(gauge.cardinality(), 1); assert_eq!(gauge.get(labels), 0);
506 }
507
508 #[cfg(feature = "eviction")]
509 #[test]
510 fn test_series_handle_protects_from_eviction() {
511 let gauge = DynamicGaugeI64::new(4);
512 let labels = &[("endpoint_id", "tombstone_i64")];
513
514 let series = gauge.series(labels);
515 series.add(5);
516 assert!(!series.is_evicted());
517
518 advance_cycle();
520 advance_cycle();
521 let removed = gauge.evict_stale(1);
522
523 assert_eq!(removed, 0);
525 assert!(!series.is_evicted());
526 assert_eq!(gauge.get(labels), 5);
527 }
528
529 #[test]
530 fn test_overflow_bucket_routes_new_series_at_capacity() {
531 let gauge = DynamicGaugeI64::with_max_series(4, 1);
532 gauge.add(&[("endpoint_id", "1")], 1);
533 gauge.add(&[("endpoint_id", "2")], 2);
534
535 assert_eq!(gauge.cardinality(), 2);
536 assert_eq!(gauge.get(&[(OVERFLOW_LABEL_KEY, OVERFLOW_LABEL_VALUE)]), 2);
537 }
538}