Skip to main content

liminal/metrics/registry/
collectors.rs

1use std::sync::Arc;
2use std::sync::atomic::{AtomicI64, AtomicU64, Ordering};
3
4use super::{HistogramBucketSnapshot, HistogramSnapshot, HistogramValue};
5
6#[derive(Debug)]
7pub(super) struct CounterMetric {
8    value: AtomicU64,
9}
10
11impl CounterMetric {
12    pub(super) const fn new() -> Self {
13        Self {
14            value: AtomicU64::new(0),
15        }
16    }
17
18    pub(super) fn snapshot(&self) -> u64 {
19        self.value.load(Ordering::Relaxed)
20    }
21}
22
23#[derive(Debug)]
24pub(super) struct GaugeMetric {
25    value: AtomicI64,
26}
27
28impl GaugeMetric {
29    pub(super) const fn new() -> Self {
30        Self {
31            value: AtomicI64::new(0),
32        }
33    }
34
35    pub(super) fn snapshot(&self) -> i64 {
36        self.value.load(Ordering::Relaxed)
37    }
38}
39
40#[derive(Debug)]
41pub(super) struct HistogramMetric {
42    boundaries: Vec<f64>,
43    counts: Vec<AtomicU64>,
44    sum_bits: AtomicU64,
45}
46
47impl HistogramMetric {
48    pub(super) fn new(boundaries: Vec<f64>) -> Self {
49        let counts_len = boundaries.len().saturating_add(1);
50        let counts = (0..counts_len).map(|_| AtomicU64::new(0)).collect();
51
52        Self {
53            boundaries,
54            counts,
55            sum_bits: AtomicU64::new(0.0_f64.to_bits()),
56        }
57    }
58
59    pub(super) fn boundaries(&self) -> &[f64] {
60        &self.boundaries
61    }
62
63    pub(super) fn snapshot(&self) -> HistogramSnapshot {
64        let buckets = self
65            .counts
66            .iter()
67            .enumerate()
68            .map(|(index, count)| HistogramBucketSnapshot {
69                upper_bound: self.boundaries.get(index).copied(),
70                count: count.load(Ordering::Relaxed),
71            })
72            .collect();
73        let sum = f64::from_bits(self.sum_bits.load(Ordering::Relaxed));
74
75        HistogramSnapshot { buckets, sum }
76    }
77
78    fn bucket_index(&self, value: f64) -> usize {
79        for (index, boundary) in self.boundaries.iter().enumerate() {
80            if value <= *boundary {
81                return index;
82            }
83        }
84
85        self.boundaries.len()
86    }
87}
88
89#[derive(Clone, Debug)]
90pub struct CounterHandle {
91    pub(super) metric: Arc<CounterMetric>,
92}
93
94impl CounterHandle {
95    #[must_use]
96    pub(super) fn noop() -> Self {
97        let metric = Arc::new(CounterMetric::new());
98        Self { metric }
99    }
100
101    pub fn increment(&self) {
102        self.increment_by(1);
103    }
104
105    pub fn increment_by(&self, amount: u64) {
106        let _ = self
107            .metric
108            .value
109            .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |current| {
110                Some(current.saturating_add(amount))
111            });
112    }
113
114    #[must_use]
115    pub fn value(&self) -> u64 {
116        self.metric.value.load(Ordering::Relaxed)
117    }
118}
119
120#[derive(Clone, Debug)]
121pub struct GaugeHandle {
122    pub(super) metric: Arc<GaugeMetric>,
123}
124
125impl GaugeHandle {
126    #[must_use]
127    pub(super) fn noop() -> Self {
128        let metric = Arc::new(GaugeMetric::new());
129        Self { metric }
130    }
131
132    pub fn set(&self, value: i64) {
133        self.metric.value.store(value, Ordering::Relaxed);
134    }
135
136    pub fn increment(&self) {
137        self.increment_by(1);
138    }
139
140    pub fn increment_by(&self, amount: i64) {
141        self.metric.value.fetch_add(amount, Ordering::Relaxed);
142    }
143
144    pub fn decrement(&self) {
145        self.decrement_by(1);
146    }
147
148    pub fn decrement_by(&self, amount: i64) {
149        self.metric.value.fetch_sub(amount, Ordering::Relaxed);
150    }
151
152    #[must_use]
153    pub fn value(&self) -> i64 {
154        self.metric.value.load(Ordering::Relaxed)
155    }
156}
157
158#[derive(Clone, Debug)]
159pub struct HistogramHandle {
160    pub(super) metric: Arc<HistogramMetric>,
161}
162
163impl HistogramHandle {
164    #[must_use]
165    pub(super) fn noop(boundaries: Vec<f64>) -> Self {
166        let metric = Arc::new(HistogramMetric::new(boundaries));
167        Self { metric }
168    }
169
170    pub fn observe<Value>(&self, value: Value)
171    where
172        Value: HistogramValue,
173    {
174        let value = value.into_f64();
175        if !value.is_finite() {
176            return;
177        }
178        let bucket_index = self.metric.bucket_index(value);
179        if let Some(count) = self.metric.counts.get(bucket_index) {
180            let _ = self.metric.sum_bits.fetch_update(
181                Ordering::Relaxed,
182                Ordering::Relaxed,
183                |current| Some((f64::from_bits(current) + value).to_bits()),
184            );
185            count.fetch_add(1, Ordering::Relaxed);
186        }
187    }
188
189    #[must_use]
190    pub fn boundaries(&self) -> &[f64] {
191        self.metric.boundaries()
192    }
193}