fast_telemetry/metric/dynamic/
gauge.rs1#[cfg(feature = "eviction")]
4use super::current_cycle;
5use super::{DynamicLabelSet, GAUGE_IDS};
6use crossbeam_utils::CachePadded;
7use parking_lot::RwLock;
8use std::cell::RefCell;
9use std::collections::HashMap;
10use std::hash::{Hash, Hasher};
11#[cfg(feature = "eviction")]
12use std::sync::atomic::AtomicU32;
13use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering};
14use std::sync::{Arc, Weak};
15
16const DEFAULT_MAX_SERIES: usize = 2000;
17const OVERFLOW_LABEL_KEY: &str = "__ft_overflow";
18const OVERFLOW_LABEL_VALUE: &str = "true";
19
20type GaugeIndexShard = CachePadded<RwLock<HashMap<DynamicLabelSet, Arc<GaugeSeries>>>>;
21
22struct GaugeSeries {
23 bits: CachePadded<AtomicU64>,
24 evicted: AtomicBool,
26 #[cfg(feature = "eviction")]
28 last_accessed_cycle: AtomicU32,
29}
30
31impl GaugeSeries {
32 #[cfg(feature = "eviction")]
33 fn new(cycle: u32) -> Self {
34 Self {
35 bits: CachePadded::new(AtomicU64::new(0.0_f64.to_bits())),
36 evicted: AtomicBool::new(false),
37 last_accessed_cycle: AtomicU32::new(cycle),
38 }
39 }
40
41 #[cfg(not(feature = "eviction"))]
42 fn new() -> Self {
43 Self {
44 bits: CachePadded::new(AtomicU64::new(0.0_f64.to_bits())),
45 evicted: AtomicBool::new(false),
46 }
47 }
48
49 #[inline]
50 fn set(&self, value: f64) {
51 self.bits.store(value.to_bits(), Ordering::Relaxed);
52 }
55
56 #[cfg(feature = "eviction")]
58 #[inline]
59 fn touch(&self, cycle: u32) {
60 self.last_accessed_cycle.store(cycle, Ordering::Relaxed);
61 }
62
63 #[inline]
64 fn get(&self) -> f64 {
65 f64::from_bits(self.bits.load(Ordering::Relaxed))
66 }
67
68 #[inline]
69 fn is_evicted(&self) -> bool {
70 self.evicted.load(Ordering::Relaxed)
71 }
72
73 #[cfg(feature = "eviction")]
74 fn mark_evicted(&self) {
75 self.evicted.store(true, Ordering::Relaxed);
76 }
77}
78
79#[derive(Clone)]
85pub struct DynamicGaugeSeries {
86 series: Arc<GaugeSeries>,
87}
88
89impl DynamicGaugeSeries {
90 #[inline]
92 pub fn set(&self, value: f64) {
93 self.series.set(value);
94 }
95
96 #[inline]
98 pub fn get(&self) -> f64 {
99 self.series.get()
100 }
101
102 #[inline]
104 pub fn is_evicted(&self) -> bool {
105 self.series.is_evicted()
106 }
107}
108
109struct SeriesCacheEntry {
110 gauge_id: usize,
111 ordered_labels: Vec<(String, String)>,
112 series: Weak<GaugeSeries>,
113}
114
115thread_local! {
116 static SERIES_CACHE: RefCell<Option<SeriesCacheEntry>> = const { RefCell::new(None) };
117}
118
119pub struct DynamicGauge {
123 id: usize,
124 max_series: usize,
125 shard_mask: usize,
126 index_shards: Vec<GaugeIndexShard>,
127 series_count: AtomicUsize,
129 overflow_count: AtomicU64,
131}
132
133impl DynamicGauge {
134 pub fn new(shard_count: usize) -> Self {
136 Self::with_max_series(shard_count, DEFAULT_MAX_SERIES)
137 }
138
139 pub fn with_max_series(shard_count: usize, max_series: usize) -> Self {
147 let shard_count = shard_count.next_power_of_two();
148 let id = GAUGE_IDS.fetch_add(1, Ordering::Relaxed);
149 Self {
150 id,
151 max_series,
152 shard_mask: shard_count - 1,
153 index_shards: (0..shard_count)
154 .map(|_| CachePadded::new(RwLock::new(HashMap::new())))
155 .collect(),
156 series_count: AtomicUsize::new(0),
157 overflow_count: AtomicU64::new(0),
158 }
159 }
160
161 pub fn series(&self, labels: &[(&str, &str)]) -> DynamicGaugeSeries {
165 if let Some(series) = self.cached_series(labels) {
166 return DynamicGaugeSeries { series };
167 }
168 let series = self.lookup_or_create(labels);
169 self.update_cache(labels, Arc::clone(&series));
170 DynamicGaugeSeries { series }
171 }
172
173 #[inline]
175 pub fn set(&self, labels: &[(&str, &str)], value: f64) {
176 if let Some(series) = self.cached_series(labels) {
177 series.set(value);
178 return;
179 }
180
181 let series = self.lookup_or_create(labels);
182 self.update_cache(labels, Arc::clone(&series));
183 series.set(value);
184 }
185
186 pub fn get(&self, labels: &[(&str, &str)]) -> f64 {
188 let key = DynamicLabelSet::from_pairs(labels);
189 let index_shard = self.index_shard_for(&key);
190 self.index_shards[index_shard]
191 .read()
192 .get(&key)
193 .map(|series| series.get())
194 .unwrap_or(0.0)
195 }
196
197 pub fn snapshot(&self) -> Vec<(DynamicLabelSet, f64)> {
199 let mut out = Vec::new();
200 for shard in &self.index_shards {
201 let guard = shard.read();
202 for (labels, series) in guard.iter() {
203 out.push((labels.clone(), series.get()));
204 }
205 }
206 out
207 }
208
209 pub fn cardinality(&self) -> usize {
211 self.index_shards
212 .iter()
213 .map(|shard| shard.read().len())
214 .sum()
215 }
216
217 pub fn overflow_count(&self) -> u64 {
222 self.overflow_count.load(Ordering::Relaxed)
223 }
224
225 pub(crate) fn visit_series(&self, mut f: impl FnMut(&[(String, String)], f64)) {
230 for shard in &self.index_shards {
231 let guard = shard.read();
232 for (labels, series) in guard.iter() {
233 f(labels.pairs(), series.get());
234 }
235 }
236 }
237
238 #[cfg(feature = "eviction")]
249 pub fn evict_stale(&self, max_staleness: u32) -> usize {
250 let cycle = current_cycle();
251 let mut removed = 0;
252
253 for shard in &self.index_shards {
254 let mut guard = shard.write();
255 guard.retain(|_labels, series| {
256 if Arc::strong_count(series) > 1 {
259 return true;
260 }
261 let last = series.last_accessed_cycle.load(Ordering::Relaxed);
263 let stale = cycle.saturating_sub(last) > max_staleness;
264 if stale {
265 series.mark_evicted();
266 removed += 1;
267 self.series_count.fetch_sub(1, Ordering::Relaxed);
268 }
269 !stale
270 });
271 }
272
273 removed
274 }
275
276 fn lookup_or_create(&self, labels: &[(&str, &str)]) -> Arc<GaugeSeries> {
277 let requested_key = DynamicLabelSet::from_pairs(labels);
278 let requested_shard = self.index_shard_for(&requested_key);
279 #[cfg(feature = "eviction")]
280 let cycle = current_cycle();
281
282 if let Some(series) = self.index_shards[requested_shard]
284 .read()
285 .get(&requested_key)
286 {
287 #[cfg(feature = "eviction")]
288 series.touch(cycle);
289 return Arc::clone(series);
290 }
291
292 let key = if self.max_series > 0
294 && self.series_count.load(Ordering::Relaxed) >= self.max_series
295 {
296 self.overflow_count.fetch_add(1, Ordering::Relaxed);
297 DynamicLabelSet::from_pairs(&[(OVERFLOW_LABEL_KEY, OVERFLOW_LABEL_VALUE)])
298 } else {
299 requested_key
300 };
301 let shard = self.index_shard_for(&key);
302
303 if let Some(series) = self.index_shards[shard].read().get(&key) {
304 #[cfg(feature = "eviction")]
305 series.touch(cycle);
306 return Arc::clone(series);
307 }
308
309 let mut guard = self.index_shards[shard].write();
310 if let Some(series) = guard.get(&key) {
311 #[cfg(feature = "eviction")]
312 series.touch(cycle);
313 return Arc::clone(series);
314 }
315 #[cfg(feature = "eviction")]
316 let series = Arc::new(GaugeSeries::new(cycle));
317 #[cfg(not(feature = "eviction"))]
318 let series = Arc::new(GaugeSeries::new());
319 guard.insert(key, Arc::clone(&series));
320 self.series_count.fetch_add(1, Ordering::Relaxed);
321 series
322 }
323
324 fn index_shard_for(&self, key: &DynamicLabelSet) -> usize {
325 let mut hasher = std::collections::hash_map::DefaultHasher::new();
326 key.hash(&mut hasher);
327 (hasher.finish() as usize) & self.shard_mask
328 }
329
330 fn cached_series(&self, labels: &[(&str, &str)]) -> Option<Arc<GaugeSeries>> {
331 SERIES_CACHE.with(|cache| {
332 let cache_ref = cache.borrow();
333 let entry = cache_ref.as_ref()?;
334 if entry.gauge_id != self.id {
335 return None;
336 }
337 if entry.ordered_labels.len() != labels.len() {
338 return None;
339 }
340 for (idx, (k, v)) in labels.iter().enumerate() {
341 let (ek, ev) = &entry.ordered_labels[idx];
342 if ek != k || ev != v {
343 return None;
344 }
345 }
346 let series = entry.series.upgrade()?;
347 if series.is_evicted() {
348 return None;
349 }
350 #[cfg(feature = "eviction")]
351 series.touch(current_cycle());
352 Some(series)
353 })
354 }
355
356 fn update_cache(&self, labels: &[(&str, &str)], series: Arc<GaugeSeries>) {
357 SERIES_CACHE.with(|cache| {
358 let ordered_labels = labels
359 .iter()
360 .map(|(k, v)| ((*k).to_string(), (*v).to_string()))
361 .collect();
362 *cache.borrow_mut() = Some(SeriesCacheEntry {
363 gauge_id: self.id,
364 ordered_labels,
365 series: Arc::downgrade(&series),
366 });
367 });
368 }
369}
370
371#[cfg(test)]
372mod tests {
373 use super::*;
374
375 #[test]
376 fn test_basic_operations() {
377 let gauge = DynamicGauge::new(4);
378 gauge.set(&[("org_id", "42"), ("endpoint_uuid", "abc")], 100.5);
379
380 assert!(
381 (gauge.get(&[("org_id", "42"), ("endpoint_uuid", "abc")]) - 100.5).abs() < f64::EPSILON
382 );
383 }
384
385 #[test]
386 fn test_label_order_is_canonicalized() {
387 let gauge = DynamicGauge::new(4);
388 gauge.set(&[("org_id", "42"), ("endpoint_uuid", "abc")], 50.0);
389
390 assert!(
391 (gauge.get(&[("endpoint_uuid", "abc"), ("org_id", "42")]) - 50.0).abs() < f64::EPSILON
392 );
393 }
394
395 #[test]
396 fn test_series_handle() {
397 let gauge = DynamicGauge::new(4);
398 let series = gauge.series(&[("org_id", "42"), ("endpoint_uuid", "abc")]);
399 series.set(123.456);
400
401 assert!((series.get() - 123.456).abs() < f64::EPSILON);
402 assert!(
403 (gauge.get(&[("org_id", "42"), ("endpoint_uuid", "abc")]) - 123.456).abs()
404 < f64::EPSILON
405 );
406 }
407
408 #[test]
409 fn test_snapshot() {
410 let gauge = DynamicGauge::new(4);
411 gauge.set(&[("org_id", "1")], 10.0);
412 gauge.set(&[("org_id", "2")], 20.0);
413
414 let snap = gauge.snapshot();
415 assert_eq!(snap.len(), 2);
416
417 let total: f64 = snap.iter().map(|(_, v)| v).sum();
418 assert!((total - 30.0).abs() < f64::EPSILON);
419 }
420
421 #[test]
422 fn test_overflow_bucket_routes_new_series_at_capacity() {
423 let gauge = DynamicGauge::with_max_series(4, 1);
424 gauge.set(&[("org_id", "1")], 1.0);
425 gauge.set(&[("org_id", "2")], 2.0);
426
427 assert_eq!(gauge.cardinality(), 2);
428 assert!(
429 (gauge.get(&[(OVERFLOW_LABEL_KEY, OVERFLOW_LABEL_VALUE)]) - 2.0).abs() < f64::EPSILON
430 );
431 }
432}