metrics_runtime/
common.rs

1use crate::data::AtomicWindowedHistogram;
2use arc_swap::ArcSwapOption;
3use atomic_shim::{AtomicI64, AtomicU64};
4use metrics_core::Key;
5use metrics_util::StreamingIntegers;
6use quanta::Clock;
7use std::{
8    fmt,
9    ops::Deref,
10    sync::{atomic::Ordering, Arc},
11    time::{Duration, Instant},
12};
13
14/// A scope, or context, for a metric.
15#[doc(hidden)]
16#[derive(PartialEq, Eq, Hash, Clone, Debug)]
17pub enum Scope {
18    /// Root scope.
19    Root,
20
21    /// A nested scope, with arbitrarily deep nesting.
22    Nested(Vec<String>),
23}
24
25impl Scope {
26    /// Adds a new part to this scope.
27    pub fn add_part<S>(self, part: S) -> Self
28    where
29        S: Into<String>,
30    {
31        match self {
32            Scope::Root => Scope::Nested(vec![part.into()]),
33            Scope::Nested(mut parts) => {
34                parts.push(part.into());
35                Scope::Nested(parts)
36            }
37        }
38    }
39
40    pub(crate) fn into_string<S>(self, name: S) -> String
41    where
42        S: Into<String>,
43    {
44        match self {
45            Scope::Root => name.into(),
46            Scope::Nested(mut parts) => {
47                parts.push(name.into());
48                parts.join(".")
49            }
50        }
51    }
52}
53
54pub(crate) type ScopeHandle = u64;
55
56#[derive(PartialEq, Eq, Hash, Clone, Debug)]
57pub(crate) enum Kind {
58    Counter,
59    Gauge,
60    Histogram,
61    Proxy,
62}
63
64#[derive(PartialEq, Eq, Hash, Clone, Debug)]
65pub(crate) struct Identifier(Key, ScopeHandle, Kind);
66
67impl Identifier {
68    pub fn new<K>(key: K, handle: ScopeHandle, kind: Kind) -> Self
69    where
70        K: Into<Key>,
71    {
72        Identifier(key.into(), handle, kind)
73    }
74
75    pub fn kind(&self) -> Kind {
76        self.2.clone()
77    }
78
79    pub fn into_parts(self) -> (Key, ScopeHandle, Kind) {
80        (self.0, self.1, self.2)
81    }
82}
83
84#[derive(Debug)]
85enum ValueState {
86    Counter(AtomicU64),
87    Gauge(AtomicI64),
88    Histogram(AtomicWindowedHistogram),
89    Proxy(ArcSwapOption<Box<ProxyFn>>),
90}
91
92#[derive(Debug)]
93pub(crate) enum ValueSnapshot {
94    Single(Measurement),
95    Multiple(Vec<(Key, Measurement)>),
96}
97
98/// A point-in-time metric measurement.
99#[derive(Debug)]
100pub enum Measurement {
101    /// Counters represent a single value that can only ever be incremented over time, or reset to
102    /// zero.
103    Counter(u64),
104    /// Gauges represent a single value that can go up _or_ down over time.
105    Gauge(i64),
106    /// Histograms measure the distribution of values for a given set of measurements.
107    ///
108    /// Histograms are slightly special in our case because we want to maintain full fidelity of
109    /// the underlying dataset.  We do this by storing all of the individual data points, but we
110    /// use [`StreamingIntegers`] to store them in a compressed in-memory form.  This allows
111    /// callers to pass around the compressed dataset and decompress/access the actual integers on
112    /// demand.
113    Histogram(StreamingIntegers),
114}
115
116#[derive(Clone, Debug)]
117/// Handle to the underlying measurement for a metric.
118pub(crate) struct ValueHandle {
119    state: Arc<ValueState>,
120}
121
122impl ValueHandle {
123    fn new(state: ValueState) -> Self {
124        ValueHandle {
125            state: Arc::new(state),
126        }
127    }
128
129    pub fn counter() -> Self {
130        Self::new(ValueState::Counter(AtomicU64::new(0)))
131    }
132
133    pub fn gauge() -> Self {
134        Self::new(ValueState::Gauge(AtomicI64::new(0)))
135    }
136
137    pub fn histogram(window: Duration, granularity: Duration, clock: Clock) -> Self {
138        Self::new(ValueState::Histogram(AtomicWindowedHistogram::new(
139            window,
140            granularity,
141            clock,
142        )))
143    }
144
145    pub fn proxy() -> Self {
146        Self::new(ValueState::Proxy(ArcSwapOption::new(None)))
147    }
148
149    pub fn update_counter(&self, value: u64) {
150        match self.state.deref() {
151            ValueState::Counter(inner) => {
152                inner.fetch_add(value, Ordering::Release);
153            }
154            _ => unreachable!("tried to access as counter, not a counter"),
155        }
156    }
157
158    pub fn update_gauge(&self, value: i64) {
159        match self.state.deref() {
160            ValueState::Gauge(inner) => inner.store(value, Ordering::Release),
161            _ => unreachable!("tried to access as gauge, not a gauge"),
162        }
163    }
164
165    pub fn increment_gauge(&self, value: i64) {
166        match self.state.deref() {
167            ValueState::Gauge(inner) => inner.fetch_add(value, Ordering::Release),
168            _ => unreachable!("tried to access as gauge, not a gauge"),
169        };
170    }
171
172    pub fn decrement_gauge(&self, value: i64) {
173        match self.state.deref() {
174            ValueState::Gauge(inner) => inner.fetch_sub(value, Ordering::Release),
175            _ => unreachable!("tried to access as gauge, not a gauge"),
176        };
177    }
178
179    pub fn update_histogram(&self, value: u64) {
180        match self.state.deref() {
181            ValueState::Histogram(inner) => inner.record(value),
182            _ => unreachable!("tried to access as histogram, not a histogram"),
183        }
184    }
185
186    pub fn update_proxy<F>(&self, value: F)
187    where
188        F: Fn() -> Vec<(Key, Measurement)> + Send + Sync + 'static,
189    {
190        match self.state.deref() {
191            ValueState::Proxy(inner) => {
192                inner.store(Some(Arc::new(Box::new(value))));
193            }
194            _ => unreachable!("tried to access as proxy, not a proxy"),
195        }
196    }
197
198    pub fn snapshot(&self) -> ValueSnapshot {
199        match self.state.deref() {
200            ValueState::Counter(inner) => {
201                let value = inner.load(Ordering::Acquire);
202                ValueSnapshot::Single(Measurement::Counter(value))
203            }
204            ValueState::Gauge(inner) => {
205                let value = inner.load(Ordering::Acquire);
206                ValueSnapshot::Single(Measurement::Gauge(value))
207            }
208            ValueState::Histogram(inner) => {
209                let stream = inner.snapshot();
210                ValueSnapshot::Single(Measurement::Histogram(stream))
211            }
212            ValueState::Proxy(maybe) => {
213                let measurements = match *maybe.load() {
214                    None => Vec::new(),
215                    Some(ref f) => f(),
216                };
217
218                ValueSnapshot::Multiple(measurements)
219            }
220        }
221    }
222}
223
224/// Trait for types that represent time and can be subtracted from each other to generate a delta.
225pub trait Delta {
226    /// Get the delta between this value and another value.
227    ///
228    /// For `Instant`, we explicitly return the nanosecond difference.  For `u64`, we return the
229    /// integer difference, but the timescale itself can be whatever the user desires.
230    fn delta(&self, other: Self) -> u64;
231}
232
233impl Delta for u64 {
234    fn delta(&self, other: u64) -> u64 {
235        self.wrapping_sub(other)
236    }
237}
238
239impl Delta for Instant {
240    fn delta(&self, other: Instant) -> u64 {
241        let dur = *self - other;
242        dur.as_nanos() as u64
243    }
244}
245
246pub trait ProxyFnInner: Fn() -> Vec<(Key, Measurement)> {}
247impl<F> ProxyFnInner for F where F: Fn() -> Vec<(Key, Measurement)> {}
248
249pub type ProxyFn = dyn ProxyFnInner<Output = Vec<(Key, Measurement)>> + Send + Sync + 'static;
250
251impl fmt::Debug for ProxyFn {
252    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
253        write!(f, "ProxyFn")
254    }
255}
256
257#[cfg(test)]
258mod tests {
259    use super::{Measurement, Scope, ValueHandle, ValueSnapshot};
260    use metrics_core::Key;
261    use quanta::Clock;
262    use std::borrow::Cow;
263    use std::time::Duration;
264
265    #[test]
266    fn test_metric_scope() {
267        let root_scope = Scope::Root;
268        assert_eq!(root_scope.into_string(""), "".to_string());
269
270        let root_scope = Scope::Root;
271        assert_eq!(root_scope.into_string("jambalaya"), "jambalaya".to_string());
272
273        let nested_scope = Scope::Nested(vec![]);
274        assert_eq!(nested_scope.into_string(""), "".to_string());
275
276        let nested_scope = Scope::Nested(vec![]);
277        assert_eq!(nested_scope.into_string("toilet"), "toilet".to_string());
278
279        let nested_scope = Scope::Nested(vec!["chamber".to_string(), "of".to_string()]);
280        assert_eq!(
281            nested_scope.into_string("secrets"),
282            "chamber.of.secrets".to_string()
283        );
284
285        let nested_scope = Scope::Nested(vec![
286            "chamber".to_string(),
287            "of".to_string(),
288            "secrets".to_string(),
289        ]);
290        assert_eq!(
291            nested_scope.into_string("toilet"),
292            "chamber.of.secrets.toilet".to_string()
293        );
294
295        let mut nested_scope = Scope::Root;
296        nested_scope = nested_scope
297            .add_part("chamber")
298            .add_part("of".to_string())
299            .add_part(Cow::Borrowed("secrets"));
300        assert_eq!(
301            nested_scope.into_string(""),
302            "chamber.of.secrets.".to_string()
303        );
304
305        let mut nested_scope = Scope::Nested(vec![
306            "chamber".to_string(),
307            "of".to_string(),
308            "secrets".to_string(),
309        ]);
310        nested_scope = nested_scope.add_part("part");
311        assert_eq!(
312            nested_scope.into_string("two"),
313            "chamber.of.secrets.part.two".to_string()
314        );
315    }
316
317    #[test]
318    fn test_metric_values() {
319        let counter = ValueHandle::counter();
320        counter.update_counter(42);
321        match counter.snapshot() {
322            ValueSnapshot::Single(Measurement::Counter(value)) => assert_eq!(value, 42),
323            _ => panic!("incorrect value snapshot type for counter"),
324        }
325
326        let gauge = ValueHandle::gauge();
327        gauge.update_gauge(23);
328        gauge.increment_gauge(20);
329        gauge.decrement_gauge(1);
330        match gauge.snapshot() {
331            ValueSnapshot::Single(Measurement::Gauge(value)) => assert_eq!(value, 42),
332            _ => panic!("incorrect value snapshot type for gauge"),
333        }
334
335        let (mock, _) = Clock::mock();
336        let histogram =
337            ValueHandle::histogram(Duration::from_secs(10), Duration::from_secs(1), mock);
338        histogram.update_histogram(8675309);
339        histogram.update_histogram(5551212);
340        match histogram.snapshot() {
341            ValueSnapshot::Single(Measurement::Histogram(stream)) => {
342                assert_eq!(stream.len(), 2);
343
344                let values = stream.decompress();
345                assert_eq!(&values[..], [8675309, 5551212]);
346            }
347            _ => panic!("incorrect value snapshot type for histogram"),
348        }
349
350        let proxy = ValueHandle::proxy();
351        proxy.update_proxy(|| vec![(Key::from_name("foo"), Measurement::Counter(23))]);
352        match proxy.snapshot() {
353            ValueSnapshot::Multiple(mut measurements) => {
354                assert_eq!(measurements.len(), 1);
355
356                let measurement = measurements.pop().expect("should have measurement");
357                assert_eq!(measurement.0.name().as_ref(), "foo");
358                match measurement.1 {
359                    Measurement::Counter(i) => assert_eq!(i, 23),
360                    _ => panic!("wrong measurement type"),
361                }
362            }
363            _ => panic!("incorrect value snapshot type for proxy"),
364        }
365
366        // This second one just makes sure that replacing the proxy function functions as intended.
367        proxy.update_proxy(|| vec![(Key::from_name("bar"), Measurement::Counter(24))]);
368        match proxy.snapshot() {
369            ValueSnapshot::Multiple(mut measurements) => {
370                assert_eq!(measurements.len(), 1);
371
372                let measurement = measurements.pop().expect("should have measurement");
373                assert_eq!(measurement.0.name().as_ref(), "bar");
374                match measurement.1 {
375                    Measurement::Counter(i) => assert_eq!(i, 24),
376                    _ => panic!("wrong measurement type"),
377                }
378            }
379            _ => panic!("incorrect value snapshot type for proxy"),
380        }
381    }
382}