metrics_runtime/
receiver.rs

1use crate::{
2    builder::{Builder, BuilderError},
3    common::Scope,
4    config::Configuration,
5    control::Controller,
6    registry::{MetricRegistry, ScopeRegistry},
7    sink::Sink,
8};
9use metrics::Recorder;
10use metrics_core::Key;
11use quanta::{Builder as UpkeepBuilder, Clock, Handle as UpkeepHandle};
12use std::{cell::RefCell, sync::Arc};
13
14thread_local! {
15    static SINK: RefCell<Option<Sink>> = RefCell::new(None);
16}
17
18/// Central store for metrics.
19///
20/// `Receiver` is the nucleus for all metrics operations.  While no operations are performed by it
21/// directly, it holds the registeries and references to resources and so it must live as long as
22/// any [`Sink`] or [`Controller`] does.
23pub struct Receiver {
24    metric_registry: Arc<MetricRegistry>,
25    scope_registry: Arc<ScopeRegistry>,
26    clock: Clock,
27    _upkeep_handle: UpkeepHandle,
28}
29
30impl Receiver {
31    pub(crate) fn from_config(config: Configuration) -> Result<Receiver, BuilderError> {
32        // Configure our clock and configure the quanta upkeep thread. The upkeep thread does that
33        // for us, and keeps us within `upkeep_interval` of the true time.  The reads of this cache
34        // time are faster than calling the underlying time source directly, and for histogram
35        // windowing, we can afford to have a very granular value compared to the raw nanosecond
36        // precsion provided by quanta by default.
37        let clock = Clock::new();
38        let upkeep = UpkeepBuilder::new_with_clock(config.upkeep_interval, clock.clone());
39        let _upkeep_handle = upkeep.start().map_err(|_| BuilderError::UpkeepFailure)?;
40
41        let scope_registry = Arc::new(ScopeRegistry::new());
42        let metric_registry = Arc::new(MetricRegistry::new(
43            scope_registry.clone(),
44            config,
45            clock.clone(),
46        ));
47
48        Ok(Receiver {
49            metric_registry,
50            scope_registry,
51            clock,
52            _upkeep_handle,
53        })
54    }
55
56    /// Creates a new [`Builder`] for building a [`Receiver`].
57    pub fn builder() -> Builder {
58        Builder::default()
59    }
60
61    /// Installs this receiver as the global metrics facade.
62    pub fn install(self) {
63        metrics::set_boxed_recorder(Box::new(self)).unwrap();
64    }
65
66    /// Creates a [`Sink`] bound to this receiver.
67    pub fn sink(&self) -> Sink {
68        Sink::new(
69            self.metric_registry.clone(),
70            self.scope_registry.clone(),
71            Scope::Root,
72            self.clock.clone(),
73        )
74    }
75
76    /// Creates a [`Controller`] bound to this receiver.
77    pub fn controller(&self) -> Controller {
78        Controller::new(self.metric_registry.clone(), self.scope_registry.clone())
79    }
80}
81
82impl Recorder for Receiver {
83    fn increment_counter(&self, key: Key, value: u64) {
84        SINK.with(move |sink| {
85            let mut sink = sink.borrow_mut();
86            if sink.is_none() {
87                let new_sink = self.sink();
88                *sink = Some(new_sink);
89            }
90
91            sink.as_mut().unwrap().increment_counter(key, value);
92        });
93    }
94
95    fn update_gauge(&self, key: Key, value: i64) {
96        SINK.with(move |sink| {
97            let mut sink = sink.borrow_mut();
98            if sink.is_none() {
99                let new_sink = self.sink();
100                *sink = Some(new_sink);
101            }
102
103            sink.as_mut().unwrap().update_gauge(key, value);
104        });
105    }
106
107    fn record_histogram(&self, key: Key, value: u64) {
108        SINK.with(move |sink| {
109            let mut sink = sink.borrow_mut();
110            if sink.is_none() {
111                let new_sink = self.sink();
112                *sink = Some(new_sink);
113            }
114
115            sink.as_mut().unwrap().record_value(key, value);
116        });
117    }
118}