1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
use crate::{
    builder::{Builder, BuilderError},
    common::Scope,
    config::Configuration,
    control::Controller,
    registry::{MetricRegistry, ScopeRegistry},
    sink::Sink,
};
use metrics::Recorder;
use metrics_core::Key;
use quanta::{Builder as UpkeepBuilder, Clock, Handle as UpkeepHandle};
use std::{cell::RefCell, sync::Arc};

thread_local! {
    static SINK: RefCell<Option<Sink>> = RefCell::new(None);
}

/// Central store for metrics.
///
/// `Receiver` is the nucleus for all metrics operations.  While no operations are performed by it
/// directly, it holds the registeries and references to resources and so it must live as long as
/// any [`Sink`] or [`Controller`] does.
pub struct Receiver {
    metric_registry: Arc<MetricRegistry>,
    scope_registry: Arc<ScopeRegistry>,
    clock: Clock,
    _upkeep_handle: UpkeepHandle,
}

impl Receiver {
    pub(crate) fn from_config(config: Configuration) -> Result<Receiver, BuilderError> {
        // Configure our clock and configure the quanta upkeep thread. The upkeep thread does that
        // for us, and keeps us within `upkeep_interval` of the true time.  The reads of this cache
        // time are faster than calling the underlying time source directly, and for histogram
        // windowing, we can afford to have a very granular value compared to the raw nanosecond
        // precsion provided by quanta by default.
        let clock = Clock::new();
        let upkeep = UpkeepBuilder::new_with_clock(config.upkeep_interval, clock.clone());
        let _upkeep_handle = upkeep.start().map_err(|_| BuilderError::UpkeepFailure)?;

        let scope_registry = Arc::new(ScopeRegistry::new());
        let metric_registry = Arc::new(MetricRegistry::new(
            scope_registry.clone(),
            config,
            clock.clone(),
        ));

        Ok(Receiver {
            metric_registry,
            scope_registry,
            clock,
            _upkeep_handle,
        })
    }

    /// Creates a new [`Builder`] for building a [`Receiver`].
    pub fn builder() -> Builder {
        Builder::default()
    }

    /// Installs this receiver as the global metrics facade.
    pub fn install(self) {
        metrics::set_boxed_recorder(Box::new(self)).unwrap();
    }

    /// Creates a [`Sink`] bound to this receiver.
    pub fn sink(&self) -> Sink {
        Sink::new(
            self.metric_registry.clone(),
            self.scope_registry.clone(),
            Scope::Root,
            self.clock.clone(),
        )
    }

    /// Creates a [`Controller`] bound to this receiver.
    pub fn controller(&self) -> Controller {
        Controller::new(self.metric_registry.clone(), self.scope_registry.clone())
    }
}

impl Recorder for Receiver {
    fn increment_counter(&self, key: Key, value: u64) {
        SINK.with(move |sink| {
            let mut sink = sink.borrow_mut();
            if sink.is_none() {
                let new_sink = self.sink();
                *sink = Some(new_sink);
            }

            sink.as_mut().unwrap().increment_counter(key, value);
        });
    }

    fn update_gauge(&self, key: Key, value: i64) {
        SINK.with(move |sink| {
            let mut sink = sink.borrow_mut();
            if sink.is_none() {
                let new_sink = self.sink();
                *sink = Some(new_sink);
            }

            sink.as_mut().unwrap().update_gauge(key, value);
        });
    }

    fn record_histogram(&self, key: Key, value: u64) {
        SINK.with(move |sink| {
            let mut sink = sink.borrow_mut();
            if sink.is_none() {
                let new_sink = self.sink();
                *sink = Some(new_sink);
            }

            sink.as_mut().unwrap().record_value(key, value);
        });
    }
}