1use crate::data::AtomicWindowedHistogram;
2use arc_swap::ArcSwapOption;
3use atomic_shim::{AtomicI64, AtomicU64};
4use metrics_core::Key;
5use metrics_util::StreamingIntegers;
6use quanta::Clock;
7use std::{
8 fmt,
9 ops::Deref,
10 sync::{atomic::Ordering, Arc},
11 time::{Duration, Instant},
12};
13
14#[doc(hidden)]
16#[derive(PartialEq, Eq, Hash, Clone, Debug)]
17pub enum Scope {
18 Root,
20
21 Nested(Vec<String>),
23}
24
25impl Scope {
26 pub fn add_part<S>(self, part: S) -> Self
28 where
29 S: Into<String>,
30 {
31 match self {
32 Scope::Root => Scope::Nested(vec![part.into()]),
33 Scope::Nested(mut parts) => {
34 parts.push(part.into());
35 Scope::Nested(parts)
36 }
37 }
38 }
39
40 pub(crate) fn into_string<S>(self, name: S) -> String
41 where
42 S: Into<String>,
43 {
44 match self {
45 Scope::Root => name.into(),
46 Scope::Nested(mut parts) => {
47 parts.push(name.into());
48 parts.join(".")
49 }
50 }
51 }
52}
53
54pub(crate) type ScopeHandle = u64;
55
56#[derive(PartialEq, Eq, Hash, Clone, Debug)]
57pub(crate) enum Kind {
58 Counter,
59 Gauge,
60 Histogram,
61 Proxy,
62}
63
64#[derive(PartialEq, Eq, Hash, Clone, Debug)]
65pub(crate) struct Identifier(Key, ScopeHandle, Kind);
66
67impl Identifier {
68 pub fn new<K>(key: K, handle: ScopeHandle, kind: Kind) -> Self
69 where
70 K: Into<Key>,
71 {
72 Identifier(key.into(), handle, kind)
73 }
74
75 pub fn kind(&self) -> Kind {
76 self.2.clone()
77 }
78
79 pub fn into_parts(self) -> (Key, ScopeHandle, Kind) {
80 (self.0, self.1, self.2)
81 }
82}
83
84#[derive(Debug)]
85enum ValueState {
86 Counter(AtomicU64),
87 Gauge(AtomicI64),
88 Histogram(AtomicWindowedHistogram),
89 Proxy(ArcSwapOption<Box<ProxyFn>>),
90}
91
92#[derive(Debug)]
93pub(crate) enum ValueSnapshot {
94 Single(Measurement),
95 Multiple(Vec<(Key, Measurement)>),
96}
97
98#[derive(Debug)]
100pub enum Measurement {
101 Counter(u64),
104 Gauge(i64),
106 Histogram(StreamingIntegers),
114}
115
116#[derive(Clone, Debug)]
117pub(crate) struct ValueHandle {
119 state: Arc<ValueState>,
120}
121
122impl ValueHandle {
123 fn new(state: ValueState) -> Self {
124 ValueHandle {
125 state: Arc::new(state),
126 }
127 }
128
129 pub fn counter() -> Self {
130 Self::new(ValueState::Counter(AtomicU64::new(0)))
131 }
132
133 pub fn gauge() -> Self {
134 Self::new(ValueState::Gauge(AtomicI64::new(0)))
135 }
136
137 pub fn histogram(window: Duration, granularity: Duration, clock: Clock) -> Self {
138 Self::new(ValueState::Histogram(AtomicWindowedHistogram::new(
139 window,
140 granularity,
141 clock,
142 )))
143 }
144
145 pub fn proxy() -> Self {
146 Self::new(ValueState::Proxy(ArcSwapOption::new(None)))
147 }
148
149 pub fn update_counter(&self, value: u64) {
150 match self.state.deref() {
151 ValueState::Counter(inner) => {
152 inner.fetch_add(value, Ordering::Release);
153 }
154 _ => unreachable!("tried to access as counter, not a counter"),
155 }
156 }
157
158 pub fn update_gauge(&self, value: i64) {
159 match self.state.deref() {
160 ValueState::Gauge(inner) => inner.store(value, Ordering::Release),
161 _ => unreachable!("tried to access as gauge, not a gauge"),
162 }
163 }
164
165 pub fn increment_gauge(&self, value: i64) {
166 match self.state.deref() {
167 ValueState::Gauge(inner) => inner.fetch_add(value, Ordering::Release),
168 _ => unreachable!("tried to access as gauge, not a gauge"),
169 };
170 }
171
172 pub fn decrement_gauge(&self, value: i64) {
173 match self.state.deref() {
174 ValueState::Gauge(inner) => inner.fetch_sub(value, Ordering::Release),
175 _ => unreachable!("tried to access as gauge, not a gauge"),
176 };
177 }
178
179 pub fn update_histogram(&self, value: u64) {
180 match self.state.deref() {
181 ValueState::Histogram(inner) => inner.record(value),
182 _ => unreachable!("tried to access as histogram, not a histogram"),
183 }
184 }
185
186 pub fn update_proxy<F>(&self, value: F)
187 where
188 F: Fn() -> Vec<(Key, Measurement)> + Send + Sync + 'static,
189 {
190 match self.state.deref() {
191 ValueState::Proxy(inner) => {
192 inner.store(Some(Arc::new(Box::new(value))));
193 }
194 _ => unreachable!("tried to access as proxy, not a proxy"),
195 }
196 }
197
198 pub fn snapshot(&self) -> ValueSnapshot {
199 match self.state.deref() {
200 ValueState::Counter(inner) => {
201 let value = inner.load(Ordering::Acquire);
202 ValueSnapshot::Single(Measurement::Counter(value))
203 }
204 ValueState::Gauge(inner) => {
205 let value = inner.load(Ordering::Acquire);
206 ValueSnapshot::Single(Measurement::Gauge(value))
207 }
208 ValueState::Histogram(inner) => {
209 let stream = inner.snapshot();
210 ValueSnapshot::Single(Measurement::Histogram(stream))
211 }
212 ValueState::Proxy(maybe) => {
213 let measurements = match *maybe.load() {
214 None => Vec::new(),
215 Some(ref f) => f(),
216 };
217
218 ValueSnapshot::Multiple(measurements)
219 }
220 }
221 }
222}
223
224pub trait Delta {
226 fn delta(&self, other: Self) -> u64;
231}
232
233impl Delta for u64 {
234 fn delta(&self, other: u64) -> u64 {
235 self.wrapping_sub(other)
236 }
237}
238
239impl Delta for Instant {
240 fn delta(&self, other: Instant) -> u64 {
241 let dur = *self - other;
242 dur.as_nanos() as u64
243 }
244}
245
246pub trait ProxyFnInner: Fn() -> Vec<(Key, Measurement)> {}
247impl<F> ProxyFnInner for F where F: Fn() -> Vec<(Key, Measurement)> {}
248
249pub type ProxyFn = dyn ProxyFnInner<Output = Vec<(Key, Measurement)>> + Send + Sync + 'static;
250
251impl fmt::Debug for ProxyFn {
252 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
253 write!(f, "ProxyFn")
254 }
255}
256
257#[cfg(test)]
258mod tests {
259 use super::{Measurement, Scope, ValueHandle, ValueSnapshot};
260 use metrics_core::Key;
261 use quanta::Clock;
262 use std::borrow::Cow;
263 use std::time::Duration;
264
265 #[test]
266 fn test_metric_scope() {
267 let root_scope = Scope::Root;
268 assert_eq!(root_scope.into_string(""), "".to_string());
269
270 let root_scope = Scope::Root;
271 assert_eq!(root_scope.into_string("jambalaya"), "jambalaya".to_string());
272
273 let nested_scope = Scope::Nested(vec![]);
274 assert_eq!(nested_scope.into_string(""), "".to_string());
275
276 let nested_scope = Scope::Nested(vec![]);
277 assert_eq!(nested_scope.into_string("toilet"), "toilet".to_string());
278
279 let nested_scope = Scope::Nested(vec!["chamber".to_string(), "of".to_string()]);
280 assert_eq!(
281 nested_scope.into_string("secrets"),
282 "chamber.of.secrets".to_string()
283 );
284
285 let nested_scope = Scope::Nested(vec![
286 "chamber".to_string(),
287 "of".to_string(),
288 "secrets".to_string(),
289 ]);
290 assert_eq!(
291 nested_scope.into_string("toilet"),
292 "chamber.of.secrets.toilet".to_string()
293 );
294
295 let mut nested_scope = Scope::Root;
296 nested_scope = nested_scope
297 .add_part("chamber")
298 .add_part("of".to_string())
299 .add_part(Cow::Borrowed("secrets"));
300 assert_eq!(
301 nested_scope.into_string(""),
302 "chamber.of.secrets.".to_string()
303 );
304
305 let mut nested_scope = Scope::Nested(vec![
306 "chamber".to_string(),
307 "of".to_string(),
308 "secrets".to_string(),
309 ]);
310 nested_scope = nested_scope.add_part("part");
311 assert_eq!(
312 nested_scope.into_string("two"),
313 "chamber.of.secrets.part.two".to_string()
314 );
315 }
316
317 #[test]
318 fn test_metric_values() {
319 let counter = ValueHandle::counter();
320 counter.update_counter(42);
321 match counter.snapshot() {
322 ValueSnapshot::Single(Measurement::Counter(value)) => assert_eq!(value, 42),
323 _ => panic!("incorrect value snapshot type for counter"),
324 }
325
326 let gauge = ValueHandle::gauge();
327 gauge.update_gauge(23);
328 gauge.increment_gauge(20);
329 gauge.decrement_gauge(1);
330 match gauge.snapshot() {
331 ValueSnapshot::Single(Measurement::Gauge(value)) => assert_eq!(value, 42),
332 _ => panic!("incorrect value snapshot type for gauge"),
333 }
334
335 let (mock, _) = Clock::mock();
336 let histogram =
337 ValueHandle::histogram(Duration::from_secs(10), Duration::from_secs(1), mock);
338 histogram.update_histogram(8675309);
339 histogram.update_histogram(5551212);
340 match histogram.snapshot() {
341 ValueSnapshot::Single(Measurement::Histogram(stream)) => {
342 assert_eq!(stream.len(), 2);
343
344 let values = stream.decompress();
345 assert_eq!(&values[..], [8675309, 5551212]);
346 }
347 _ => panic!("incorrect value snapshot type for histogram"),
348 }
349
350 let proxy = ValueHandle::proxy();
351 proxy.update_proxy(|| vec![(Key::from_name("foo"), Measurement::Counter(23))]);
352 match proxy.snapshot() {
353 ValueSnapshot::Multiple(mut measurements) => {
354 assert_eq!(measurements.len(), 1);
355
356 let measurement = measurements.pop().expect("should have measurement");
357 assert_eq!(measurement.0.name().as_ref(), "foo");
358 match measurement.1 {
359 Measurement::Counter(i) => assert_eq!(i, 23),
360 _ => panic!("wrong measurement type"),
361 }
362 }
363 _ => panic!("incorrect value snapshot type for proxy"),
364 }
365
366 proxy.update_proxy(|| vec![(Key::from_name("bar"), Measurement::Counter(24))]);
368 match proxy.snapshot() {
369 ValueSnapshot::Multiple(mut measurements) => {
370 assert_eq!(measurements.len(), 1);
371
372 let measurement = measurements.pop().expect("should have measurement");
373 assert_eq!(measurement.0.name().as_ref(), "bar");
374 match measurement.1 {
375 Measurement::Counter(i) => assert_eq!(i, 24),
376 _ => panic!("wrong measurement type"),
377 }
378 }
379 _ => panic!("incorrect value snapshot type for proxy"),
380 }
381 }
382}