fast_telemetry/metric/dynamic/
gauge.rs1use super::cache::{CacheableSeries, LabelCache, SERIES_CACHE_SIZE};
4#[cfg(feature = "eviction")]
5use super::current_cycle;
6use super::{DynamicLabelSet, GAUGE_IDS};
7use crossbeam_utils::CachePadded;
8use parking_lot::RwLock;
9use std::cell::RefCell;
10use std::collections::HashMap;
11use std::hash::{Hash, Hasher};
12#[cfg(feature = "eviction")]
13use std::sync::atomic::AtomicU32;
14use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering};
15use std::sync::{Arc, Weak};
16
17const DEFAULT_MAX_SERIES: usize = 2000;
18const OVERFLOW_LABEL_KEY: &str = "__ft_overflow";
19const OVERFLOW_LABEL_VALUE: &str = "true";
20
21type GaugeIndexShard = CachePadded<RwLock<HashMap<DynamicLabelSet, Arc<GaugeSeries>>>>;
22
23struct GaugeSeries {
24 bits: CachePadded<AtomicU64>,
25 evicted: AtomicBool,
27 #[cfg(feature = "eviction")]
29 last_accessed_cycle: AtomicU32,
30}
31
32impl GaugeSeries {
33 #[cfg(feature = "eviction")]
34 fn new(cycle: u32) -> Self {
35 Self {
36 bits: CachePadded::new(AtomicU64::new(0.0_f64.to_bits())),
37 evicted: AtomicBool::new(false),
38 last_accessed_cycle: AtomicU32::new(cycle),
39 }
40 }
41
42 #[cfg(not(feature = "eviction"))]
43 fn new() -> Self {
44 Self {
45 bits: CachePadded::new(AtomicU64::new(0.0_f64.to_bits())),
46 evicted: AtomicBool::new(false),
47 }
48 }
49
50 #[inline]
51 fn set(&self, value: f64) {
52 self.bits.store(value.to_bits(), Ordering::Relaxed);
53 }
56
57 #[cfg(feature = "eviction")]
59 #[inline]
60 fn touch(&self, cycle: u32) {
61 self.last_accessed_cycle.store(cycle, Ordering::Relaxed);
62 }
63
64 #[inline]
65 fn get(&self) -> f64 {
66 f64::from_bits(self.bits.load(Ordering::Relaxed))
67 }
68
69 #[inline]
70 fn is_evicted(&self) -> bool {
71 self.evicted.load(Ordering::Relaxed)
72 }
73
74 #[cfg(feature = "eviction")]
75 fn mark_evicted(&self) {
76 self.evicted.store(true, Ordering::Relaxed);
77 }
78}
79
80impl CacheableSeries for GaugeSeries {
81 fn is_evicted(&self) -> bool {
82 self.is_evicted()
83 }
84}
85
86#[derive(Clone)]
92pub struct DynamicGaugeSeries {
93 series: Arc<GaugeSeries>,
94}
95
96impl DynamicGaugeSeries {
97 #[inline]
99 pub fn set(&self, value: f64) {
100 self.series.set(value);
101 }
102
103 #[inline]
105 pub fn get(&self) -> f64 {
106 self.series.get()
107 }
108
109 #[inline]
111 pub fn is_evicted(&self) -> bool {
112 self.series.is_evicted()
113 }
114}
115
116thread_local! {
117 static SERIES_CACHE: RefCell<LabelCache<Weak<GaugeSeries>, SERIES_CACHE_SIZE>> =
118 RefCell::new(LabelCache::new());
119}
120
121pub struct DynamicGauge {
125 id: usize,
126 max_series: usize,
127 shard_mask: usize,
128 index_shards: Vec<GaugeIndexShard>,
129 series_count: AtomicUsize,
131 overflow_count: AtomicU64,
133}
134
135impl DynamicGauge {
136 pub fn new(shard_count: usize) -> Self {
138 Self::with_max_series(shard_count, DEFAULT_MAX_SERIES)
139 }
140
141 pub fn with_max_series(shard_count: usize, max_series: usize) -> Self {
149 let shard_count = shard_count.next_power_of_two();
150 let id = GAUGE_IDS.fetch_add(1, Ordering::Relaxed);
151 Self {
152 id,
153 max_series,
154 shard_mask: shard_count - 1,
155 index_shards: (0..shard_count)
156 .map(|_| CachePadded::new(RwLock::new(HashMap::new())))
157 .collect(),
158 series_count: AtomicUsize::new(0),
159 overflow_count: AtomicU64::new(0),
160 }
161 }
162
163 pub fn series(&self, labels: &[(&str, &str)]) -> DynamicGaugeSeries {
167 if let Some(series) = self.cached_series(labels) {
168 return DynamicGaugeSeries { series };
169 }
170 let series = self.lookup_or_create(labels);
171 self.update_cache(labels, &series);
172 DynamicGaugeSeries { series }
173 }
174
175 #[inline]
177 pub fn set(&self, labels: &[(&str, &str)], value: f64) {
178 if let Some(series) = self.cached_series(labels) {
179 series.set(value);
180 return;
181 }
182
183 let series = self.lookup_or_create(labels);
184 self.update_cache(labels, &series);
185 series.set(value);
186 }
187
188 pub fn get(&self, labels: &[(&str, &str)]) -> f64 {
190 let key = DynamicLabelSet::from_pairs(labels);
191 let index_shard = self.index_shard_for(&key);
192 self.index_shards[index_shard]
193 .read()
194 .get(&key)
195 .map(|series| series.get())
196 .unwrap_or(0.0)
197 }
198
199 pub fn snapshot(&self) -> Vec<(DynamicLabelSet, f64)> {
201 let mut out = Vec::new();
202 for shard in &self.index_shards {
203 let guard = shard.read();
204 for (labels, series) in guard.iter() {
205 out.push((labels.clone(), series.get()));
206 }
207 }
208 out
209 }
210
211 pub fn cardinality(&self) -> usize {
213 self.index_shards
214 .iter()
215 .map(|shard| shard.read().len())
216 .sum()
217 }
218
219 pub fn overflow_count(&self) -> u64 {
224 self.overflow_count.load(Ordering::Relaxed)
225 }
226
227 #[doc(hidden)]
234 pub fn visit_series(&self, mut f: impl FnMut(&[(String, String)], f64)) {
235 for shard in &self.index_shards {
236 let guard = shard.read();
237 for (labels, series) in guard.iter() {
238 f(labels.pairs(), series.get());
239 }
240 }
241 }
242
243 #[cfg(feature = "eviction")]
254 pub fn evict_stale(&self, max_staleness: u32) -> usize {
255 let cycle = current_cycle();
256 let mut removed = 0;
257
258 for shard in &self.index_shards {
259 let mut guard = shard.write();
260 guard.retain(|_labels, series| {
261 if Arc::strong_count(series) > 1 {
264 return true;
265 }
266 let last = series.last_accessed_cycle.load(Ordering::Relaxed);
268 let stale = cycle.saturating_sub(last) > max_staleness;
269 if stale {
270 series.mark_evicted();
271 removed += 1;
272 self.series_count.fetch_sub(1, Ordering::Relaxed);
273 }
274 !stale
275 });
276 }
277
278 removed
279 }
280
281 fn lookup_or_create(&self, labels: &[(&str, &str)]) -> Arc<GaugeSeries> {
282 let requested_key = DynamicLabelSet::from_pairs(labels);
283 let requested_shard = self.index_shard_for(&requested_key);
284 #[cfg(feature = "eviction")]
285 let cycle = current_cycle();
286
287 if let Some(series) = self.index_shards[requested_shard]
289 .read()
290 .get(&requested_key)
291 {
292 #[cfg(feature = "eviction")]
293 series.touch(cycle);
294 return Arc::clone(series);
295 }
296
297 let key = if self.max_series > 0
299 && self.series_count.load(Ordering::Relaxed) >= self.max_series
300 {
301 self.overflow_count.fetch_add(1, Ordering::Relaxed);
302 DynamicLabelSet::from_pairs(&[(OVERFLOW_LABEL_KEY, OVERFLOW_LABEL_VALUE)])
303 } else {
304 requested_key
305 };
306 let shard = self.index_shard_for(&key);
307
308 if let Some(series) = self.index_shards[shard].read().get(&key) {
309 #[cfg(feature = "eviction")]
310 series.touch(cycle);
311 return Arc::clone(series);
312 }
313
314 let mut guard = self.index_shards[shard].write();
315 if let Some(series) = guard.get(&key) {
316 #[cfg(feature = "eviction")]
317 series.touch(cycle);
318 return Arc::clone(series);
319 }
320 #[cfg(feature = "eviction")]
321 let series = Arc::new(GaugeSeries::new(cycle));
322 #[cfg(not(feature = "eviction"))]
323 let series = Arc::new(GaugeSeries::new());
324 guard.insert(key, Arc::clone(&series));
325 self.series_count.fetch_add(1, Ordering::Relaxed);
326 series
327 }
328
329 fn index_shard_for(&self, key: &DynamicLabelSet) -> usize {
330 let mut hasher = std::collections::hash_map::DefaultHasher::new();
331 key.hash(&mut hasher);
332 (hasher.finish() as usize) & self.shard_mask
333 }
334
335 fn cached_series(&self, labels: &[(&str, &str)]) -> Option<Arc<GaugeSeries>> {
336 SERIES_CACHE.with(|cache| {
337 let series = cache.borrow_mut().get(self.id, labels)?;
338 #[cfg(feature = "eviction")]
339 series.touch(current_cycle());
340 Some(series)
341 })
342 }
343
344 fn update_cache(&self, labels: &[(&str, &str)], series: &Arc<GaugeSeries>) {
345 SERIES_CACHE.with(|cache| {
346 cache
347 .borrow_mut()
348 .insert(self.id, labels, Arc::downgrade(series));
349 });
350 }
351}
352
353#[cfg(test)]
354mod tests {
355 use super::*;
356
357 #[test]
358 fn test_basic_operations() {
359 let gauge = DynamicGauge::new(4);
360 gauge.set(&[("org_id", "42"), ("endpoint_uuid", "abc")], 100.5);
361
362 assert!(
363 (gauge.get(&[("org_id", "42"), ("endpoint_uuid", "abc")]) - 100.5).abs() < f64::EPSILON
364 );
365 }
366
367 #[test]
368 fn test_label_order_is_canonicalized() {
369 let gauge = DynamicGauge::new(4);
370 gauge.set(&[("org_id", "42"), ("endpoint_uuid", "abc")], 50.0);
371
372 assert!(
373 (gauge.get(&[("endpoint_uuid", "abc"), ("org_id", "42")]) - 50.0).abs() < f64::EPSILON
374 );
375 }
376
377 #[test]
378 fn test_series_handle() {
379 let gauge = DynamicGauge::new(4);
380 let series = gauge.series(&[("org_id", "42"), ("endpoint_uuid", "abc")]);
381 series.set(123.456);
382
383 assert!((series.get() - 123.456).abs() < f64::EPSILON);
384 assert!(
385 (gauge.get(&[("org_id", "42"), ("endpoint_uuid", "abc")]) - 123.456).abs()
386 < f64::EPSILON
387 );
388 }
389
390 #[test]
391 fn test_snapshot() {
392 let gauge = DynamicGauge::new(4);
393 gauge.set(&[("org_id", "1")], 10.0);
394 gauge.set(&[("org_id", "2")], 20.0);
395
396 let snap = gauge.snapshot();
397 assert_eq!(snap.len(), 2);
398
399 let total: f64 = snap.iter().map(|(_, v)| v).sum();
400 assert!((total - 30.0).abs() < f64::EPSILON);
401 }
402
403 #[test]
404 fn test_overflow_bucket_routes_new_series_at_capacity() {
405 let gauge = DynamicGauge::with_max_series(4, 1);
406 gauge.set(&[("org_id", "1")], 1.0);
407 gauge.set(&[("org_id", "2")], 2.0);
408
409 assert_eq!(gauge.cardinality(), 2);
410 assert!(
411 (gauge.get(&[(OVERFLOW_LABEL_KEY, OVERFLOW_LABEL_VALUE)]) - 2.0).abs() < f64::EPSILON
412 );
413 }
414}