metrics_runtime/
receiver.rs1use crate::{
2    builder::{Builder, BuilderError},
3    common::Scope,
4    config::Configuration,
5    control::Controller,
6    registry::{MetricRegistry, ScopeRegistry},
7    sink::Sink,
8};
9use metrics::Recorder;
10use metrics_core::Key;
11use quanta::{Builder as UpkeepBuilder, Clock, Handle as UpkeepHandle};
12use std::{cell::RefCell, sync::Arc};
13
14thread_local! {
15    static SINK: RefCell<Option<Sink>> = RefCell::new(None);
16}
17
18pub struct Receiver {
24    metric_registry: Arc<MetricRegistry>,
25    scope_registry: Arc<ScopeRegistry>,
26    clock: Clock,
27    _upkeep_handle: UpkeepHandle,
28}
29
30impl Receiver {
31    pub(crate) fn from_config(config: Configuration) -> Result<Receiver, BuilderError> {
32        let clock = Clock::new();
38        let upkeep = UpkeepBuilder::new_with_clock(config.upkeep_interval, clock.clone());
39        let _upkeep_handle = upkeep.start().map_err(|_| BuilderError::UpkeepFailure)?;
40
41        let scope_registry = Arc::new(ScopeRegistry::new());
42        let metric_registry = Arc::new(MetricRegistry::new(
43            scope_registry.clone(),
44            config,
45            clock.clone(),
46        ));
47
48        Ok(Receiver {
49            metric_registry,
50            scope_registry,
51            clock,
52            _upkeep_handle,
53        })
54    }
55
56    pub fn builder() -> Builder {
58        Builder::default()
59    }
60
61    pub fn install(self) {
63        metrics::set_boxed_recorder(Box::new(self)).unwrap();
64    }
65
66    pub fn sink(&self) -> Sink {
68        Sink::new(
69            self.metric_registry.clone(),
70            self.scope_registry.clone(),
71            Scope::Root,
72            self.clock.clone(),
73        )
74    }
75
76    pub fn controller(&self) -> Controller {
78        Controller::new(self.metric_registry.clone(), self.scope_registry.clone())
79    }
80}
81
82impl Recorder for Receiver {
83    fn increment_counter(&self, key: Key, value: u64) {
84        SINK.with(move |sink| {
85            let mut sink = sink.borrow_mut();
86            if sink.is_none() {
87                let new_sink = self.sink();
88                *sink = Some(new_sink);
89            }
90
91            sink.as_mut().unwrap().increment_counter(key, value);
92        });
93    }
94
95    fn update_gauge(&self, key: Key, value: i64) {
96        SINK.with(move |sink| {
97            let mut sink = sink.borrow_mut();
98            if sink.is_none() {
99                let new_sink = self.sink();
100                *sink = Some(new_sink);
101            }
102
103            sink.as_mut().unwrap().update_gauge(key, value);
104        });
105    }
106
107    fn record_histogram(&self, key: Key, value: u64) {
108        SINK.with(move |sink| {
109            let mut sink = sink.borrow_mut();
110            if sink.is_none() {
111                let new_sink = self.sink();
112                *sink = Some(new_sink);
113            }
114
115            sink.as_mut().unwrap().record_value(key, value);
116        });
117    }
118}