fast_telemetry/metric/dynamic/
gauge_i64.rs1use super::cache::{CacheableSeries, LabelCache, SERIES_CACHE_SIZE};
8#[cfg(feature = "eviction")]
9use super::current_cycle;
10use super::{DynamicLabelSet, thread_id};
11use crossbeam_utils::CachePadded;
12use parking_lot::RwLock;
13use std::cell::RefCell;
14use std::collections::HashMap;
15use std::hash::{Hash, Hasher};
16#[cfg(feature = "eviction")]
17use std::sync::atomic::AtomicU32;
18use std::sync::atomic::{AtomicBool, AtomicI64, AtomicU64, AtomicUsize, Ordering};
19use std::sync::{Arc, Weak};
20
21static GAUGE_I64_IDS: AtomicUsize = AtomicUsize::new(1);
22const DEFAULT_MAX_SERIES: usize = 2000;
23const OVERFLOW_LABEL_KEY: &str = "__ft_overflow";
24const OVERFLOW_LABEL_VALUE: &str = "true";
25type GaugeI64IndexShard = CachePadded<RwLock<HashMap<DynamicLabelSet, Arc<GaugeI64Series>>>>;
26
27struct GaugeI64Series {
28 cells: Vec<CachePadded<AtomicI64>>,
29 evicted: AtomicBool,
31 #[cfg(feature = "eviction")]
33 last_accessed_cycle: AtomicU32,
34}
35
36impl GaugeI64Series {
37 #[cfg(feature = "eviction")]
38 fn new(shard_count: usize, cycle: u32) -> Self {
39 Self {
40 cells: (0..shard_count)
41 .map(|_| CachePadded::new(AtomicI64::new(0)))
42 .collect(),
43 evicted: AtomicBool::new(false),
44 last_accessed_cycle: AtomicU32::new(cycle),
45 }
46 }
47
48 #[cfg(not(feature = "eviction"))]
49 fn new(shard_count: usize) -> Self {
50 Self {
51 cells: (0..shard_count)
52 .map(|_| CachePadded::new(AtomicI64::new(0)))
53 .collect(),
54 evicted: AtomicBool::new(false),
55 }
56 }
57
58 #[inline]
59 fn add_at(&self, shard_idx: usize, value: i64) {
60 self.cells[shard_idx].fetch_add(value, Ordering::Relaxed);
61 }
64
65 #[inline]
66 fn set_at(&self, shard_idx: usize, value: i64) {
67 for (i, cell) in self.cells.iter().enumerate() {
70 if i == shard_idx {
71 cell.store(value, Ordering::Relaxed);
72 } else {
73 cell.store(0, Ordering::Relaxed);
74 }
75 }
76 }
79
80 #[cfg(feature = "eviction")]
82 #[inline]
83 fn touch(&self, cycle: u32) {
84 self.last_accessed_cycle.store(cycle, Ordering::Relaxed);
85 }
86
87 #[inline]
88 fn sum(&self) -> i64 {
89 self.cells
90 .iter()
91 .map(|cell| cell.load(Ordering::Relaxed))
92 .sum()
93 }
94
95 #[inline]
96 fn is_evicted(&self) -> bool {
97 self.evicted.load(Ordering::Relaxed)
98 }
99
100 #[cfg(feature = "eviction")]
101 fn mark_evicted(&self) {
102 self.evicted.store(true, Ordering::Relaxed);
103 }
104}
105
106impl CacheableSeries for GaugeI64Series {
107 fn is_evicted(&self) -> bool {
108 self.is_evicted()
109 }
110}
111
112#[derive(Clone)]
118pub struct DynamicGaugeI64Series {
119 series: Arc<GaugeI64Series>,
120 shard_mask: usize,
121}
122
123impl DynamicGaugeI64Series {
124 #[inline]
126 pub fn inc(&self) {
127 self.add(1);
128 }
129
130 #[inline]
132 pub fn dec(&self) {
133 self.add(-1);
134 }
135
136 #[inline]
138 pub fn add(&self, value: i64) {
139 let shard_idx = thread_id() & self.shard_mask;
140 self.series.add_at(shard_idx, value);
141 }
142
143 #[inline]
145 pub fn set(&self, value: i64) {
146 let shard_idx = thread_id() & self.shard_mask;
147 self.series.set_at(shard_idx, value);
148 }
149
150 #[inline]
152 pub fn get(&self) -> i64 {
153 self.series.sum()
154 }
155
156 #[inline]
158 pub fn is_evicted(&self) -> bool {
159 self.series.is_evicted()
160 }
161}
162
163thread_local! {
164 static SERIES_CACHE: RefCell<LabelCache<Weak<GaugeI64Series>, SERIES_CACHE_SIZE>> =
165 RefCell::new(LabelCache::new());
166}
167
168pub struct DynamicGaugeI64 {
175 id: usize,
176 shard_count: usize,
177 max_series: usize,
178 shard_mask: usize,
179 index_shards: Vec<GaugeI64IndexShard>,
180 series_count: AtomicUsize,
182 overflow_count: AtomicU64,
184}
185
186impl DynamicGaugeI64 {
187 pub fn new(shard_count: usize) -> Self {
189 Self::with_max_series(shard_count, DEFAULT_MAX_SERIES)
190 }
191
192 pub fn with_max_series(shard_count: usize, max_series: usize) -> Self {
200 let shard_count = shard_count.next_power_of_two();
201 let id = GAUGE_I64_IDS.fetch_add(1, Ordering::Relaxed);
202 Self {
203 id,
204 shard_count,
205 max_series,
206 shard_mask: shard_count - 1,
207 index_shards: (0..shard_count)
208 .map(|_| CachePadded::new(RwLock::new(HashMap::new())))
209 .collect(),
210 series_count: AtomicUsize::new(0),
211 overflow_count: AtomicU64::new(0),
212 }
213 }
214
215 pub fn series(&self, labels: &[(&str, &str)]) -> DynamicGaugeI64Series {
217 if let Some(series) = self.cached_series(labels) {
218 return DynamicGaugeI64Series {
219 series,
220 shard_mask: self.shard_mask,
221 };
222 }
223 let series = self.lookup_or_create(labels);
224 self.update_cache(labels, &series);
225 DynamicGaugeI64Series {
226 series,
227 shard_mask: self.shard_mask,
228 }
229 }
230
231 #[inline]
233 pub fn inc(&self, labels: &[(&str, &str)]) {
234 self.add(labels, 1);
235 }
236
237 #[inline]
239 pub fn dec(&self, labels: &[(&str, &str)]) {
240 self.add(labels, -1);
241 }
242
243 #[inline]
245 pub fn add(&self, labels: &[(&str, &str)], value: i64) {
246 if let Some(series) = self.cached_series(labels) {
247 let shard_idx = thread_id() & self.shard_mask;
248 series.add_at(shard_idx, value);
249 return;
250 }
251
252 let series = self.lookup_or_create(labels);
253 self.update_cache(labels, &series);
254 let shard_idx = thread_id() & self.shard_mask;
255 series.add_at(shard_idx, value);
256 }
257
258 #[inline]
260 pub fn set(&self, labels: &[(&str, &str)], value: i64) {
261 if let Some(series) = self.cached_series(labels) {
262 let shard_idx = thread_id() & self.shard_mask;
263 series.set_at(shard_idx, value);
264 return;
265 }
266
267 let series = self.lookup_or_create(labels);
268 self.update_cache(labels, &series);
269 let shard_idx = thread_id() & self.shard_mask;
270 series.set_at(shard_idx, value);
271 }
272
273 pub fn get(&self, labels: &[(&str, &str)]) -> i64 {
275 let key = DynamicLabelSet::from_pairs(labels);
276 let index_shard = self.index_shard_for(&key);
277 self.index_shards[index_shard]
278 .read()
279 .get(&key)
280 .map(|series| series.sum())
281 .unwrap_or(0)
282 }
283
284 pub fn sum_all(&self) -> i64 {
286 self.snapshot().into_iter().map(|(_, value)| value).sum()
287 }
288
289 pub fn snapshot(&self) -> Vec<(DynamicLabelSet, i64)> {
291 let mut out = Vec::new();
292 for shard in &self.index_shards {
293 let guard = shard.read();
294 for (labels, series) in guard.iter() {
295 out.push((labels.clone(), series.sum()));
296 }
297 }
298 out
299 }
300
301 pub fn cardinality(&self) -> usize {
303 self.index_shards
304 .iter()
305 .map(|shard| shard.read().len())
306 .sum()
307 }
308
309 pub fn overflow_count(&self) -> u64 {
314 self.overflow_count.load(Ordering::Relaxed)
315 }
316
317 #[doc(hidden)]
324 pub fn visit_series(&self, mut f: impl FnMut(&[(String, String)], i64)) {
325 for shard in &self.index_shards {
326 let guard = shard.read();
327 for (labels, series) in guard.iter() {
328 f(labels.pairs(), series.sum());
329 }
330 }
331 }
332
333 #[cfg(feature = "eviction")]
344 pub fn evict_stale(&self, max_staleness: u32) -> usize {
345 let cycle = current_cycle();
346 let mut removed = 0;
347
348 for shard in &self.index_shards {
349 let mut guard = shard.write();
350 guard.retain(|_labels, series| {
351 if Arc::strong_count(series) > 1 {
354 return true;
355 }
356 let last = series.last_accessed_cycle.load(Ordering::Relaxed);
358 let stale = cycle.saturating_sub(last) > max_staleness;
359 if stale {
360 series.mark_evicted();
361 removed += 1;
362 self.series_count.fetch_sub(1, Ordering::Relaxed);
363 }
364 !stale
365 });
366 }
367
368 removed
369 }
370
371 fn lookup_or_create(&self, labels: &[(&str, &str)]) -> Arc<GaugeI64Series> {
372 let requested_key = DynamicLabelSet::from_pairs(labels);
373 let requested_shard = self.index_shard_for(&requested_key);
374 #[cfg(feature = "eviction")]
375 let cycle = current_cycle();
376
377 if let Some(series) = self.index_shards[requested_shard]
379 .read()
380 .get(&requested_key)
381 {
382 #[cfg(feature = "eviction")]
383 series.touch(cycle);
384 return Arc::clone(series);
385 }
386
387 let key = if self.max_series > 0
389 && self.series_count.load(Ordering::Relaxed) >= self.max_series
390 {
391 self.overflow_count.fetch_add(1, Ordering::Relaxed);
392 DynamicLabelSet::from_pairs(&[(OVERFLOW_LABEL_KEY, OVERFLOW_LABEL_VALUE)])
393 } else {
394 requested_key
395 };
396 let shard = self.index_shard_for(&key);
397
398 if let Some(series) = self.index_shards[shard].read().get(&key) {
399 #[cfg(feature = "eviction")]
400 series.touch(cycle);
401 return Arc::clone(series);
402 }
403
404 let mut guard = self.index_shards[shard].write();
405 if let Some(series) = guard.get(&key) {
406 #[cfg(feature = "eviction")]
407 series.touch(cycle);
408 return Arc::clone(series);
409 }
410 #[cfg(feature = "eviction")]
411 let series = Arc::new(GaugeI64Series::new(self.shard_count, cycle));
412 #[cfg(not(feature = "eviction"))]
413 let series = Arc::new(GaugeI64Series::new(self.shard_count));
414 guard.insert(key, Arc::clone(&series));
415 self.series_count.fetch_add(1, Ordering::Relaxed);
416 series
417 }
418
419 fn index_shard_for(&self, key: &DynamicLabelSet) -> usize {
420 let mut hasher = std::collections::hash_map::DefaultHasher::new();
421 key.hash(&mut hasher);
422 (hasher.finish() as usize) & self.shard_mask
423 }
424
425 fn cached_series(&self, labels: &[(&str, &str)]) -> Option<Arc<GaugeI64Series>> {
426 SERIES_CACHE.with(|cache| {
427 let series = cache.borrow_mut().get(self.id, labels)?;
428 #[cfg(feature = "eviction")]
429 series.touch(current_cycle());
430 Some(series)
431 })
432 }
433
434 fn update_cache(&self, labels: &[(&str, &str)], series: &Arc<GaugeI64Series>) {
435 SERIES_CACHE.with(|cache| {
436 cache
437 .borrow_mut()
438 .insert(self.id, labels, Arc::downgrade(series));
439 });
440 }
441}
442
443#[cfg(test)]
444mod tests {
445 #[cfg(feature = "eviction")]
446 use super::super::advance_cycle;
447 use super::*;
448
449 #[test]
450 fn test_basic_operations() {
451 let gauge = DynamicGaugeI64::new(4);
452 gauge.inc(&[("endpoint_id", "ep1")]);
453 gauge.add(&[("endpoint_id", "ep1")], 2);
454
455 assert_eq!(gauge.get(&[("endpoint_id", "ep1")]), 3);
456
457 gauge.dec(&[("endpoint_id", "ep1")]);
458 assert_eq!(gauge.get(&[("endpoint_id", "ep1")]), 2);
459
460 gauge.add(&[("endpoint_id", "ep1")], -2);
461 assert_eq!(gauge.get(&[("endpoint_id", "ep1")]), 0);
462 }
463
464 #[test]
465 fn test_series_handle() {
466 let gauge = DynamicGaugeI64::new(4);
467 let series = gauge.series(&[("endpoint_id", "ep1")]);
468 series.inc();
469 series.inc();
470 series.dec();
471
472 assert_eq!(series.get(), 1);
473 assert_eq!(gauge.get(&[("endpoint_id", "ep1")]), 1);
474 }
475
476 #[test]
477 fn test_snapshot() {
478 let gauge = DynamicGaugeI64::new(4);
479 gauge.add(&[("endpoint_id", "ep1")], 10);
480 gauge.add(&[("endpoint_id", "ep2")], 20);
481
482 let snap = gauge.snapshot();
483 assert_eq!(snap.len(), 2);
484
485 let total: i64 = snap.iter().map(|(_, v)| v).sum();
486 assert_eq!(total, 30);
487 }
488
489 #[cfg(feature = "eviction")]
490 #[test]
491 fn test_evict_stale() {
492 let gauge = DynamicGaugeI64::new(4);
493 let labels = &[("endpoint_id", "evict_i64")];
494
495 gauge.add(labels, 5);
496 assert_eq!(gauge.cardinality(), 1);
497
498 advance_cycle();
500 advance_cycle();
501
502 gauge.add(&[("flush", "cache")], 1);
504
505 let removed = gauge.evict_stale(1);
506 assert_eq!(removed, 1);
507 assert_eq!(gauge.cardinality(), 1); assert_eq!(gauge.get(labels), 0);
509 }
510
511 #[cfg(feature = "eviction")]
512 #[test]
513 fn test_series_handle_protects_from_eviction() {
514 let gauge = DynamicGaugeI64::new(4);
515 let labels = &[("endpoint_id", "tombstone_i64")];
516
517 let series = gauge.series(labels);
518 series.add(5);
519 assert!(!series.is_evicted());
520
521 advance_cycle();
523 advance_cycle();
524 let removed = gauge.evict_stale(1);
525
526 assert_eq!(removed, 0);
528 assert!(!series.is_evicted());
529 assert_eq!(gauge.get(labels), 5);
530 }
531
532 #[test]
533 fn test_overflow_bucket_routes_new_series_at_capacity() {
534 let gauge = DynamicGaugeI64::with_max_series(4, 1);
535 gauge.add(&[("endpoint_id", "1")], 1);
536 gauge.add(&[("endpoint_id", "2")], 2);
537
538 assert_eq!(gauge.cardinality(), 2);
539 assert_eq!(gauge.get(&[(OVERFLOW_LABEL_KEY, OVERFLOW_LABEL_VALUE)]), 2);
540 }
541}