fast_telemetry/metric/dynamic/
gauge.rs1use super::cache::{CacheableSeries, LabelCache, SERIES_CACHE_SIZE};
4#[cfg(feature = "eviction")]
5use super::current_cycle;
6use super::{DynamicLabelSet, GAUGE_IDS};
7use crossbeam_utils::CachePadded;
8use parking_lot::RwLock;
9use std::cell::RefCell;
10use std::collections::HashMap;
11use std::hash::{Hash, Hasher};
12#[cfg(feature = "eviction")]
13use std::sync::atomic::AtomicU32;
14use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering};
15use std::sync::{Arc, Weak};
16
17const DEFAULT_MAX_SERIES: usize = 2000;
18const OVERFLOW_LABEL_KEY: &str = "__ft_overflow";
19const OVERFLOW_LABEL_VALUE: &str = "true";
20
21type GaugeIndexShard = CachePadded<RwLock<HashMap<DynamicLabelSet, Arc<GaugeSeries>>>>;
22
23struct GaugeSeries {
24 bits: CachePadded<AtomicU64>,
25 evicted: AtomicBool,
27 #[cfg(feature = "eviction")]
29 last_accessed_cycle: AtomicU32,
30}
31
32impl GaugeSeries {
33 #[cfg(feature = "eviction")]
34 fn new(cycle: u32) -> Self {
35 Self {
36 bits: CachePadded::new(AtomicU64::new(0.0_f64.to_bits())),
37 evicted: AtomicBool::new(false),
38 last_accessed_cycle: AtomicU32::new(cycle),
39 }
40 }
41
42 #[cfg(not(feature = "eviction"))]
43 fn new() -> Self {
44 Self {
45 bits: CachePadded::new(AtomicU64::new(0.0_f64.to_bits())),
46 evicted: AtomicBool::new(false),
47 }
48 }
49
50 #[inline]
51 fn set(&self, value: f64) {
52 self.bits.store(value.to_bits(), Ordering::Relaxed);
53 }
56
57 #[cfg(feature = "eviction")]
59 #[inline]
60 fn touch(&self, cycle: u32) {
61 self.last_accessed_cycle.store(cycle, Ordering::Relaxed);
62 }
63
64 #[inline]
65 fn get(&self) -> f64 {
66 f64::from_bits(self.bits.load(Ordering::Relaxed))
67 }
68
69 #[inline]
70 fn is_evicted(&self) -> bool {
71 self.evicted.load(Ordering::Relaxed)
72 }
73
74 #[cfg(feature = "eviction")]
75 fn mark_evicted(&self) {
76 self.evicted.store(true, Ordering::Relaxed);
77 }
78}
79
80impl CacheableSeries for GaugeSeries {
81 fn is_evicted(&self) -> bool {
82 self.is_evicted()
83 }
84}
85
86#[derive(Clone)]
92pub struct DynamicGaugeSeries {
93 series: Arc<GaugeSeries>,
94}
95
96impl DynamicGaugeSeries {
97 #[inline]
99 pub fn set(&self, value: f64) {
100 self.series.set(value);
101 }
102
103 #[inline]
105 pub fn get(&self) -> f64 {
106 self.series.get()
107 }
108
109 #[inline]
111 pub fn is_evicted(&self) -> bool {
112 self.series.is_evicted()
113 }
114}
115
116thread_local! {
117 static SERIES_CACHE: RefCell<LabelCache<Weak<GaugeSeries>, SERIES_CACHE_SIZE>> =
118 RefCell::new(LabelCache::new());
119}
120
121pub struct DynamicGauge {
125 id: usize,
126 max_series: usize,
127 shard_mask: usize,
128 index_shards: Vec<GaugeIndexShard>,
129 series_count: AtomicUsize,
131 overflow_count: AtomicU64,
133}
134
135impl DynamicGauge {
136 pub fn new(shard_count: usize) -> Self {
138 Self::with_max_series(shard_count, DEFAULT_MAX_SERIES)
139 }
140
141 pub fn with_max_series(shard_count: usize, max_series: usize) -> Self {
149 let shard_count = shard_count.next_power_of_two();
150 let id = GAUGE_IDS.fetch_add(1, Ordering::Relaxed);
151 Self {
152 id,
153 max_series,
154 shard_mask: shard_count - 1,
155 index_shards: (0..shard_count)
156 .map(|_| CachePadded::new(RwLock::new(HashMap::new())))
157 .collect(),
158 series_count: AtomicUsize::new(0),
159 overflow_count: AtomicU64::new(0),
160 }
161 }
162
163 pub fn series(&self, labels: &[(&str, &str)]) -> DynamicGaugeSeries {
167 if let Some(series) = self.cached_series(labels) {
168 return DynamicGaugeSeries { series };
169 }
170 let series = self.lookup_or_create(labels);
171 self.update_cache(labels, &series);
172 DynamicGaugeSeries { series }
173 }
174
175 #[inline]
177 pub fn set(&self, labels: &[(&str, &str)], value: f64) {
178 if let Some(series) = self.cached_series(labels) {
179 series.set(value);
180 return;
181 }
182
183 let series = self.lookup_or_create(labels);
184 self.update_cache(labels, &series);
185 series.set(value);
186 }
187
188 pub fn get(&self, labels: &[(&str, &str)]) -> f64 {
190 let key = DynamicLabelSet::from_pairs(labels);
191 let index_shard = self.index_shard_for(&key);
192 self.index_shards[index_shard]
193 .read()
194 .get(&key)
195 .map(|series| series.get())
196 .unwrap_or(0.0)
197 }
198
199 pub fn snapshot(&self) -> Vec<(DynamicLabelSet, f64)> {
201 let mut out = Vec::new();
202 for shard in &self.index_shards {
203 let guard = shard.read();
204 for (labels, series) in guard.iter() {
205 out.push((labels.clone(), series.get()));
206 }
207 }
208 out
209 }
210
211 pub fn cardinality(&self) -> usize {
213 self.index_shards
214 .iter()
215 .map(|shard| shard.read().len())
216 .sum()
217 }
218
219 pub fn overflow_count(&self) -> u64 {
224 self.overflow_count.load(Ordering::Relaxed)
225 }
226
227 pub(crate) fn visit_series(&self, mut f: impl FnMut(&[(String, String)], f64)) {
232 for shard in &self.index_shards {
233 let guard = shard.read();
234 for (labels, series) in guard.iter() {
235 f(labels.pairs(), series.get());
236 }
237 }
238 }
239
240 #[cfg(feature = "eviction")]
251 pub fn evict_stale(&self, max_staleness: u32) -> usize {
252 let cycle = current_cycle();
253 let mut removed = 0;
254
255 for shard in &self.index_shards {
256 let mut guard = shard.write();
257 guard.retain(|_labels, series| {
258 if Arc::strong_count(series) > 1 {
261 return true;
262 }
263 let last = series.last_accessed_cycle.load(Ordering::Relaxed);
265 let stale = cycle.saturating_sub(last) > max_staleness;
266 if stale {
267 series.mark_evicted();
268 removed += 1;
269 self.series_count.fetch_sub(1, Ordering::Relaxed);
270 }
271 !stale
272 });
273 }
274
275 removed
276 }
277
278 fn lookup_or_create(&self, labels: &[(&str, &str)]) -> Arc<GaugeSeries> {
279 let requested_key = DynamicLabelSet::from_pairs(labels);
280 let requested_shard = self.index_shard_for(&requested_key);
281 #[cfg(feature = "eviction")]
282 let cycle = current_cycle();
283
284 if let Some(series) = self.index_shards[requested_shard]
286 .read()
287 .get(&requested_key)
288 {
289 #[cfg(feature = "eviction")]
290 series.touch(cycle);
291 return Arc::clone(series);
292 }
293
294 let key = if self.max_series > 0
296 && self.series_count.load(Ordering::Relaxed) >= self.max_series
297 {
298 self.overflow_count.fetch_add(1, Ordering::Relaxed);
299 DynamicLabelSet::from_pairs(&[(OVERFLOW_LABEL_KEY, OVERFLOW_LABEL_VALUE)])
300 } else {
301 requested_key
302 };
303 let shard = self.index_shard_for(&key);
304
305 if let Some(series) = self.index_shards[shard].read().get(&key) {
306 #[cfg(feature = "eviction")]
307 series.touch(cycle);
308 return Arc::clone(series);
309 }
310
311 let mut guard = self.index_shards[shard].write();
312 if let Some(series) = guard.get(&key) {
313 #[cfg(feature = "eviction")]
314 series.touch(cycle);
315 return Arc::clone(series);
316 }
317 #[cfg(feature = "eviction")]
318 let series = Arc::new(GaugeSeries::new(cycle));
319 #[cfg(not(feature = "eviction"))]
320 let series = Arc::new(GaugeSeries::new());
321 guard.insert(key, Arc::clone(&series));
322 self.series_count.fetch_add(1, Ordering::Relaxed);
323 series
324 }
325
326 fn index_shard_for(&self, key: &DynamicLabelSet) -> usize {
327 let mut hasher = std::collections::hash_map::DefaultHasher::new();
328 key.hash(&mut hasher);
329 (hasher.finish() as usize) & self.shard_mask
330 }
331
332 fn cached_series(&self, labels: &[(&str, &str)]) -> Option<Arc<GaugeSeries>> {
333 SERIES_CACHE.with(|cache| {
334 let series = cache.borrow_mut().get(self.id, labels)?;
335 #[cfg(feature = "eviction")]
336 series.touch(current_cycle());
337 Some(series)
338 })
339 }
340
341 fn update_cache(&self, labels: &[(&str, &str)], series: &Arc<GaugeSeries>) {
342 SERIES_CACHE.with(|cache| {
343 cache
344 .borrow_mut()
345 .insert(self.id, labels, Arc::downgrade(series));
346 });
347 }
348}
349
350#[cfg(test)]
351mod tests {
352 use super::*;
353
354 #[test]
355 fn test_basic_operations() {
356 let gauge = DynamicGauge::new(4);
357 gauge.set(&[("org_id", "42"), ("endpoint_uuid", "abc")], 100.5);
358
359 assert!(
360 (gauge.get(&[("org_id", "42"), ("endpoint_uuid", "abc")]) - 100.5).abs() < f64::EPSILON
361 );
362 }
363
364 #[test]
365 fn test_label_order_is_canonicalized() {
366 let gauge = DynamicGauge::new(4);
367 gauge.set(&[("org_id", "42"), ("endpoint_uuid", "abc")], 50.0);
368
369 assert!(
370 (gauge.get(&[("endpoint_uuid", "abc"), ("org_id", "42")]) - 50.0).abs() < f64::EPSILON
371 );
372 }
373
374 #[test]
375 fn test_series_handle() {
376 let gauge = DynamicGauge::new(4);
377 let series = gauge.series(&[("org_id", "42"), ("endpoint_uuid", "abc")]);
378 series.set(123.456);
379
380 assert!((series.get() - 123.456).abs() < f64::EPSILON);
381 assert!(
382 (gauge.get(&[("org_id", "42"), ("endpoint_uuid", "abc")]) - 123.456).abs()
383 < f64::EPSILON
384 );
385 }
386
387 #[test]
388 fn test_snapshot() {
389 let gauge = DynamicGauge::new(4);
390 gauge.set(&[("org_id", "1")], 10.0);
391 gauge.set(&[("org_id", "2")], 20.0);
392
393 let snap = gauge.snapshot();
394 assert_eq!(snap.len(), 2);
395
396 let total: f64 = snap.iter().map(|(_, v)| v).sum();
397 assert!((total - 30.0).abs() < f64::EPSILON);
398 }
399
400 #[test]
401 fn test_overflow_bucket_routes_new_series_at_capacity() {
402 let gauge = DynamicGauge::with_max_series(4, 1);
403 gauge.set(&[("org_id", "1")], 1.0);
404 gauge.set(&[("org_id", "2")], 2.0);
405
406 assert_eq!(gauge.cardinality(), 2);
407 assert!(
408 (gauge.get(&[(OVERFLOW_LABEL_KEY, OVERFLOW_LABEL_VALUE)]) - 2.0).abs() < f64::EPSILON
409 );
410 }
411}