prometheus 0.13.4

Prometheus instrumentation library for Rust applications.
Documentation
// Copyright 2014 The Prometheus Authors
// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.

use std::cell::RefCell;
use std::collections::HashMap;
use std::convert::From;
use std::sync::{
    atomic::{AtomicU64 as StdAtomicU64, Ordering},
    Arc, Mutex,
};
use std::time::{Duration, Instant as StdInstant};

use crate::atomic64::{Atomic, AtomicF64, AtomicU64};
use crate::desc::{Desc, Describer};
use crate::errors::{Error, Result};
use crate::metrics::{Collector, LocalMetric, Metric, Opts};
use crate::proto;
use crate::value::make_label_pairs;
use crate::vec::{MetricVec, MetricVecBuilder};

/// The default [`Histogram`] buckets. The default buckets are
/// tailored to broadly measure the response time (in seconds) of a
/// network service. Most likely, however, you will be required to define
/// buckets customized to your use case.
pub const DEFAULT_BUCKETS: &[f64; 11] = &[
    0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0,
];

/// Used for the label that defines the upper bound of a
/// bucket of a histogram ("le" -> "less or equal").
pub const BUCKET_LABEL: &str = "le";

#[inline]
fn check_bucket_label(label: &str) -> Result<()> {
    if label == BUCKET_LABEL {
        return Err(Error::Msg(
            "`le` is not allowed as label name in histograms".to_owned(),
        ));
    }

    Ok(())
}

fn check_and_adjust_buckets(mut buckets: Vec<f64>) -> Result<Vec<f64>> {
    if buckets.is_empty() {
        buckets = Vec::from(DEFAULT_BUCKETS as &'static [f64]);
    }

    for (i, upper_bound) in buckets.iter().enumerate() {
        if i < (buckets.len() - 1) && *upper_bound >= buckets[i + 1] {
            return Err(Error::Msg(format!(
                "histogram buckets must be in increasing \
                 order: {} >= {}",
                upper_bound,
                buckets[i + 1]
            )));
        }
    }

    let tail = *buckets.last().unwrap();
    if tail.is_sign_positive() && tail.is_infinite() {
        // The +Inf bucket is implicit. Remove it here.
        buckets.pop();
    }

    Ok(buckets)
}

/// A struct that bundles the options for creating a [`Histogram`] metric. It is
/// mandatory to set Name and Help to a non-empty string. All other fields are
/// optional and can safely be left at their zero value.
#[derive(Clone, Debug)]
pub struct HistogramOpts {
    /// A container holding various options.
    pub common_opts: Opts,

    /// Defines the buckets into which observations are counted. Each
    /// element in the slice is the upper inclusive bound of a bucket. The
    /// values must be sorted in strictly increasing order. There is no need
    /// to add a highest bucket with +Inf bound, it will be added
    /// implicitly. The default value is DefBuckets.
    pub buckets: Vec<f64>,
}

impl HistogramOpts {
    /// Create a [`HistogramOpts`] with the `name` and `help` arguments.
    pub fn new<S1: Into<String>, S2: Into<String>>(name: S1, help: S2) -> HistogramOpts {
        HistogramOpts {
            common_opts: Opts::new(name, help),
            buckets: Vec::from(DEFAULT_BUCKETS as &'static [f64]),
        }
    }

    /// `namespace` sets the namespace.
    pub fn namespace<S: Into<String>>(mut self, namespace: S) -> Self {
        self.common_opts.namespace = namespace.into();
        self
    }

    /// `subsystem` sets the sub system.
    pub fn subsystem<S: Into<String>>(mut self, subsystem: S) -> Self {
        self.common_opts.subsystem = subsystem.into();
        self
    }

    /// `const_labels` sets the const labels.
    pub fn const_labels(mut self, const_labels: HashMap<String, String>) -> Self {
        self.common_opts = self.common_opts.const_labels(const_labels);
        self
    }

    /// `const_label` adds a const label.
    pub fn const_label<S1: Into<String>, S2: Into<String>>(mut self, name: S1, value: S2) -> Self {
        self.common_opts = self.common_opts.const_label(name, value);
        self
    }

    /// `variable_labels` sets the variable labels.
    pub fn variable_labels(mut self, variable_labels: Vec<String>) -> Self {
        self.common_opts = self.common_opts.variable_labels(variable_labels);
        self
    }

    /// `variable_label` adds a variable label.
    pub fn variable_label<S: Into<String>>(mut self, name: S) -> Self {
        self.common_opts = self.common_opts.variable_label(name);
        self
    }

    /// `fq_name` returns the fq_name.
    pub fn fq_name(&self) -> String {
        self.common_opts.fq_name()
    }

    /// `buckets` set the buckets.
    pub fn buckets(mut self, buckets: Vec<f64>) -> Self {
        self.buckets = buckets;
        self
    }
}

impl Describer for HistogramOpts {
    fn describe(&self) -> Result<Desc> {
        self.common_opts.describe()
    }
}

impl From<Opts> for HistogramOpts {
    fn from(opts: Opts) -> HistogramOpts {
        HistogramOpts {
            common_opts: opts,
            buckets: Vec::from(DEFAULT_BUCKETS as &'static [f64]),
        }
    }
}

/// Representation of a hot or cold shard.
///
/// See [`HistogramCore`] for details.
#[derive(Debug)]
struct Shard {
    sum: AtomicF64,
    count: AtomicU64,
    buckets: Vec<AtomicU64>,
}

impl Shard {
    fn new(num_buckets: usize) -> Self {
        let mut buckets = Vec::new();
        for _ in 0..num_buckets {
            buckets.push(AtomicU64::new(0));
        }

        Shard {
            sum: AtomicF64::new(0.0),
            count: AtomicU64::new(0),
            buckets,
        }
    }
}

/// Index into an array of [`Shard`]s.
///
/// Used in conjunction with [`ShardAndCount`] below.
#[derive(Debug, Clone, Copy)]
enum ShardIndex {
    /// First index. Corresponds to 0.
    First,
    /// Second index. Corresponds to 1.
    Second,
}

impl ShardIndex {
    /// Inverse the given [`ShardIndex`].
    fn inverse(self) -> ShardIndex {
        match self {
            ShardIndex::First => ShardIndex::Second,
            ShardIndex::Second => ShardIndex::First,
        }
    }
}

impl From<u64> for ShardIndex {
    fn from(index: u64) -> Self {
        match index {
            0 => ShardIndex::First,
            1 => ShardIndex::Second,
            _ => panic!(
                "Invalid shard index {:?}. A histogram only has two shards.",
                index
            ),
        }
    }
}

impl From<ShardIndex> for usize {
    fn from(index: ShardIndex) -> Self {
        match index {
            ShardIndex::First => 0,
            ShardIndex::Second => 1,
        }
    }
}

/// An atomic u64 with the most significant used as a [`ShardIndex`] and the
/// remaining 63 bits used to count [`Histogram`] observations.
#[derive(Debug)]
struct ShardAndCount {
    inner: StdAtomicU64,
}

impl ShardAndCount {
    /// Return a new [`ShardAndCount`] with both the most significant bit
    /// i.e. the `ShardIndex` and the remaining 63 bit i.e. the observation
    /// count set to 0.
    fn new() -> Self {
        ShardAndCount {
            inner: StdAtomicU64::new(0),
        }
    }

    /// Flip the most significant bit i.e. the [`ShardIndex`] leaving the
    /// remaining 63 bits unchanged.
    fn flip(&self, ordering: Ordering) -> (ShardIndex, u64) {
        let n = self.inner.fetch_add(1 << 63, ordering);

        ShardAndCount::split_shard_index_and_count(n)
    }

    /// Get the most significant bit i.e. the [`ShardIndex`] as well as the
    /// remaining 63 bits i.e. the observation count.
    fn get(&self) -> (ShardIndex, u64) {
        let n = self.inner.load(Ordering::Relaxed);

        ShardAndCount::split_shard_index_and_count(n)
    }

    /// Increment the observation count leaving the most significant bit i.e.
    /// the [`ShardIndex`] untouched.
    fn inc_by(&self, delta: u64, ordering: Ordering) -> (ShardIndex, u64) {
        let n = self.inner.fetch_add(delta, ordering);

        ShardAndCount::split_shard_index_and_count(n)
    }

    /// Increment the observation count by one leaving the most significant bit
    /// i.e. the [`ShardIndex`] untouched.
    fn inc(&self, ordering: Ordering) -> (ShardIndex, u64) {
        self.inc_by(1, ordering)
    }

    fn split_shard_index_and_count(n: u64) -> (ShardIndex, u64) {
        let shard = n >> 63;
        let count = n & ((1 << 63) - 1);

        (shard.into(), count)
    }
}

/// Core datastructure of a Prometheus histogram
///
/// # Atomicity across collects
///
/// A histogram supports two main execution paths:
///
/// 1. `observe` which increases the overall observation counter, updates the
/// observation sum and increases a single bucket counter.
///
/// 2. `proto` (aka. collecting the metric, from now on referred to as the
/// collect operation) which snapshots the state of the histogram and exposes it
/// as a Protobuf struct.
///
/// If an observe and a collect operation interleave, the latter could be
/// exposing a snapshot of the histogram that does not uphold all histogram
/// invariants. For example for the invariant that the overall observation
/// counter should equal the sum of all bucket counters: Say that an `observe`
/// increases the overall counter but before updating a specific bucket counter
/// a collect operation snapshots the histogram.
///
/// The below implementation of `HistogramCore` prevents such race conditions by
/// using two shards, one hot shard for `observe` operations to record their
/// observation and one cold shard for collect operations to collect a
/// consistent snapshot of the histogram.
///
/// `observe` operations hit the hot shard and record their observation. Collect
/// operations switch hot and cold, wait for all `observe` calls to finish on
/// the previously hot now cold shard and then expose the consistent snapshot.
#[derive(Debug)]
pub struct HistogramCore {
    desc: Desc,
    label_pairs: Vec<proto::LabelPair>,

    /// Mutual exclusion to serialize collect operations. No two collect
    /// operations should operate on this datastructure at the same time. (See
    /// struct documentation for details.) `observe` operations can operate in
    /// parallel without holding this lock.
    collect_lock: Mutex<()>,

    /// An atomic u64 where the first bit determines the currently hot shard and
    /// the remaining 63 bits determine the overall count.
    shard_and_count: ShardAndCount,
    /// The two shards where `shard_and_count` determines which one is the hot
    /// and which one the cold at any given point in time.
    shards: [Shard; 2],

    upper_bounds: Vec<f64>,
}

impl HistogramCore {
    pub fn new(opts: &HistogramOpts, label_values: &[&str]) -> Result<HistogramCore> {
        let desc = opts.describe()?;

        for name in &desc.variable_labels {
            check_bucket_label(name)?;
        }
        for pair in &desc.const_label_pairs {
            check_bucket_label(pair.get_name())?;
        }

        let label_pairs = make_label_pairs(&desc, label_values)?;

        let buckets = check_and_adjust_buckets(opts.buckets.clone())?;

        Ok(HistogramCore {
            desc,
            label_pairs,

            collect_lock: Mutex::new(()),

            shard_and_count: ShardAndCount::new(),
            shards: [Shard::new(buckets.len()), Shard::new(buckets.len())],

            upper_bounds: buckets,
        })
    }

    /// Record a given observation (f64) in the histogram.
    //
    // First increase the overall observation counter and thus learn which shard
    // is the current hot shard. Subsequently on the hot shard update the
    // corresponding bucket count, adjust the shard's sum and finally increase
    // the shard's count.
    pub fn observe(&self, v: f64) {
        // The collect code path uses `self.shard_and_count` and
        // `self.shards[x].count` to ensure not to collect data from a shard
        // while observe calls are still operating on it.
        //
        // To ensure the above, this `inc` needs to use `Acquire` ordering to
        // force anything below this line to stay below it.
        let (shard_index, _count) = self.shard_and_count.inc(Ordering::Acquire);

        let shard: &Shard = &self.shards[usize::from(shard_index)];

        // Try find the bucket.
        let mut iter = self
            .upper_bounds
            .iter()
            .enumerate()
            .filter(|&(_, f)| v <= *f);
        if let Some((i, _)) = iter.next() {
            shard.buckets[i].inc_by(1);
        }

        shard.sum.inc_by(v);
        // Use `Release` ordering to ensure all operations above stay above.
        shard.count.inc_by_with_ordering(1, Ordering::Release);
    }

    /// Make a snapshot of the current histogram state exposed as a Protobuf
    /// struct.
    //
    // Acquire the collect lock, switch the hot and the cold shard, wait for all
    // remaining `observe` calls to finish on the previously hot now cold shard,
    // snapshot the data, update the now hot shard and reset the cold shard.
    pub fn proto(&self) -> proto::Histogram {
        let collect_guard = self.collect_lock.lock().expect("Lock poisoned");

        // `flip` needs to use AcqRel ordering to ensure the lock operation
        // above stays above and the histogram operations (especially the shard
        // resets) below stay below.
        let (cold_shard_index, overall_count) = self.shard_and_count.flip(Ordering::AcqRel);

        let cold_shard = &self.shards[usize::from(cold_shard_index)];
        let hot_shard = &self.shards[usize::from(cold_shard_index.inverse())];

        // Wait for all currently active `observe` calls on the now cold shard
        // to finish. The above call to `flip` redirects all future `observe`
        // calls to the other previously cold, now hot, shard. Thus once the
        // cold shard counter equals the value of the global counter when the
        // shards were flipped, all in-progress `observe` calls are done. With
        // all of them done, the cold shard is now in a consistent state.
        //
        // `observe` uses `Release` ordering. `compare_exchange` needs to use
        // `Acquire` ordering to ensure that (1) one sees all the previous
        // `observe` stores to the counter and (2) to ensure the below shard
        // modifications happen after this point, thus the shard is not modified
        // by any `observe` operations.
        while cold_shard
            .count
            .compare_exchange_weak(
                overall_count,
                // While at it, reset cold shard count on success.
                0,
                Ordering::Acquire,
                Ordering::Acquire,
            )
            .is_err()
        {}

        // Get cold shard sum and reset to 0.
        //
        // Use `Acquire` for load and `Release` for store to ensure not to
        // interfere with previous or upcoming collect calls.
        let cold_shard_sum = cold_shard.sum.swap(0.0, Ordering::AcqRel);

        let mut h = proto::Histogram::default();
        h.set_sample_sum(cold_shard_sum);
        h.set_sample_count(overall_count);

        let mut cumulative_count = 0;
        let mut buckets = Vec::with_capacity(self.upper_bounds.len());
        for (i, upper_bound) in self.upper_bounds.iter().enumerate() {
            // Reset the cold shard and update the hot shard.
            //
            // Use `Acquire` for load and `Release` for store to ensure not to
            // interfere with previous or upcoming collect calls.
            let cold_bucket_count = cold_shard.buckets[i].swap(0, Ordering::AcqRel);
            hot_shard.buckets[i].inc_by(cold_bucket_count);

            cumulative_count += cold_bucket_count;
            let mut b = proto::Bucket::default();
            b.set_cumulative_count(cumulative_count);
            b.set_upper_bound(*upper_bound);
            buckets.push(b);
        }
        h.set_bucket(from_vec!(buckets));

        // Update the hot shard.
        hot_shard.count.inc_by(overall_count);
        hot_shard.sum.inc_by(cold_shard_sum);

        drop(collect_guard);

        h
    }

    fn sample_sum(&self) -> f64 {
        // Make sure to not overlap with any collect calls, as they might flip
        // the hot and cold shards.
        let _guard = self.collect_lock.lock().expect("Lock poisoned");

        let (shard_index, _count) = self.shard_and_count.get();
        self.shards[shard_index as usize].sum.get()
    }

    fn sample_count(&self) -> u64 {
        self.shard_and_count.get().1
    }
}

// We have to wrap libc::timespec in order to implement std::fmt::Debug.
#[cfg(all(feature = "nightly", target_os = "linux"))]
pub struct Timespec(libc::timespec);

#[cfg(all(feature = "nightly", target_os = "linux"))]
impl std::fmt::Debug for Timespec {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        write!(
            f,
            "Timespec {{ tv_sec: {}, tv_nsec: {} }}",
            self.0.tv_sec, self.0.tv_nsec
        )
    }
}

#[derive(Debug)]
pub enum Instant {
    Monotonic(StdInstant),
    #[cfg(all(feature = "nightly", target_os = "linux"))]
    MonotonicCoarse(Timespec),
}

impl Instant {
    pub fn now() -> Instant {
        Instant::Monotonic(StdInstant::now())
    }

    #[cfg(all(feature = "nightly", target_os = "linux"))]
    pub fn now_coarse() -> Instant {
        Instant::MonotonicCoarse(get_time_coarse())
    }

    #[cfg(all(feature = "nightly", not(target_os = "linux")))]
    pub fn now_coarse() -> Instant {
        Instant::Monotonic(StdInstant::now())
    }

    pub fn elapsed(&self) -> Duration {
        match self {
            // We use `saturating_duration_since` to avoid panics caused by non-monotonic clocks.
            Instant::Monotonic(i) => StdInstant::now().saturating_duration_since(*i),

            // It is different from `Instant::Monotonic`, the resolution here is millisecond.
            // The processors in an SMP system do not start all at exactly the same time
            // and therefore the timer registers are typically running at an offset.
            // Use millisecond resolution for ignoring the error.
            // See more: https://linux.die.net/man/2/clock_gettime
            #[cfg(all(feature = "nightly", target_os = "linux"))]
            Instant::MonotonicCoarse(t) => {
                let now = get_time_coarse();
                let now_ms = now.0.tv_sec * MILLIS_PER_SEC + now.0.tv_nsec / NANOS_PER_MILLI;
                let t_ms = t.0.tv_sec * MILLIS_PER_SEC + t.0.tv_nsec / NANOS_PER_MILLI;
                let dur = now_ms - t_ms;
                if dur >= 0 {
                    Duration::from_millis(dur as u64)
                } else {
                    Duration::from_millis(0)
                }
            }
        }
    }

    #[inline]
    pub fn elapsed_sec(&self) -> f64 {
        duration_to_seconds(self.elapsed())
    }
}

#[cfg(all(feature = "nightly", target_os = "linux"))]
use self::coarse::*;

#[cfg(all(feature = "nightly", target_os = "linux"))]
mod coarse {
    use crate::histogram::Timespec;
    pub use libc::timespec;
    use libc::{clock_gettime, CLOCK_MONOTONIC_COARSE};

    pub const NANOS_PER_MILLI: i64 = 1_000_000;
    pub const MILLIS_PER_SEC: i64 = 1_000;

    pub fn get_time_coarse() -> Timespec {
        let mut t = Timespec(timespec {
            tv_sec: 0,
            tv_nsec: 0,
        });
        assert_eq!(
            unsafe { clock_gettime(CLOCK_MONOTONIC_COARSE, &mut t.0) },
            0
        );
        t
    }
}

/// Timer to measure and record the duration of an event.
///
/// This timer can be stopped and observed at most once, either automatically (when it
/// goes out of scope) or manually.
/// Alternatively, it can be manually stopped and discarded in order to not record its value.
#[must_use = "Timer should be kept in a variable otherwise it cannot observe duration"]
#[derive(Debug)]
pub struct HistogramTimer {
    /// A histogram for automatic recording of observations.
    histogram: Histogram,
    /// Whether the timer has already been observed once.
    observed: bool,
    /// Starting instant for the timer.
    start: Instant,
}

impl HistogramTimer {
    fn new(histogram: Histogram) -> Self {
        Self {
            histogram,
            observed: false,
            start: Instant::now(),
        }
    }

    #[cfg(feature = "nightly")]
    fn new_coarse(histogram: Histogram) -> Self {
        HistogramTimer {
            histogram,
            observed: false,
            start: Instant::now_coarse(),
        }
    }

    /// Observe and record timer duration (in seconds).
    ///
    /// It observes the floating-point number of seconds elapsed since the timer
    /// started, and it records that value to the attached histogram.
    pub fn observe_duration(self) {
        self.stop_and_record();
    }

    /// Observe, record and return timer duration (in seconds).
    ///
    /// It observes and returns a floating-point number for seconds elapsed since
    /// the timer started, recording that value to the attached histogram.
    pub fn stop_and_record(self) -> f64 {
        let mut timer = self;
        timer.observe(true)
    }

    /// Observe and return timer duration (in seconds).
    ///
    /// It returns a floating-point number of seconds elapsed since the timer started,
    /// without recording to any histogram.
    pub fn stop_and_discard(self) -> f64 {
        let mut timer = self;
        timer.observe(false)
    }

    fn observe(&mut self, record: bool) -> f64 {
        let v = self.start.elapsed_sec();
        self.observed = true;
        if record {
            self.histogram.observe(v);
        }
        v
    }
}

impl Drop for HistogramTimer {
    fn drop(&mut self) {
        if !self.observed {
            self.observe(true);
        }
    }
}

/// A [`Metric`] counts individual observations from an event or sample stream
/// in configurable buckets. Similar to a [`Summary`](crate::proto::Summary),
/// it also provides a sum of observations and an observation count.
///
/// On the Prometheus server, quantiles can be calculated from a [`Histogram`] using
/// the [`histogram_quantile`][1] function in the query language.
///
/// Note that Histograms, in contrast to Summaries, can be aggregated with the
/// Prometheus query language (see [the prometheus documentation][2] for
/// detailed procedures). However, Histograms require the user to pre-define
/// suitable buckets, (see [`linear_buckets`] and [`exponential_buckets`] for
/// some helper provided here) and they are in general less accurate. The
/// Observe method of a [`Histogram`] has a very low performance overhead in
/// comparison with the Observe method of a Summary.
///
/// [1]: https://prometheus.io/docs/prometheus/latest/querying/functions/#histogram_quantile
/// [2]: https://prometheus.io/docs/practices/histograms/
#[derive(Clone, Debug)]
pub struct Histogram {
    core: Arc<HistogramCore>,
}

impl Histogram {
    /// `with_opts` creates a [`Histogram`] with the `opts` options.
    pub fn with_opts(opts: HistogramOpts) -> Result<Histogram> {
        Histogram::with_opts_and_label_values(&opts, &[])
    }

    fn with_opts_and_label_values(
        opts: &HistogramOpts,
        label_values: &[&str],
    ) -> Result<Histogram> {
        let core = HistogramCore::new(opts, label_values)?;

        Ok(Histogram {
            core: Arc::new(core),
        })
    }
}

impl Histogram {
    /// Add a single observation to the [`Histogram`].
    pub fn observe(&self, v: f64) {
        self.core.observe(v)
    }

    /// Return a [`HistogramTimer`] to track a duration.
    pub fn start_timer(&self) -> HistogramTimer {
        HistogramTimer::new(self.clone())
    }

    /// Return a [`HistogramTimer`] to track a duration.
    /// It is faster but less precise.
    #[cfg(feature = "nightly")]
    pub fn start_coarse_timer(&self) -> HistogramTimer {
        HistogramTimer::new_coarse(self.clone())
    }

    /// Observe execution time of a closure, in second.
    pub fn observe_closure_duration<F, T>(&self, f: F) -> T
    where
        F: FnOnce() -> T,
    {
        let instant = Instant::now();
        let res = f();
        let elapsed = instant.elapsed_sec();
        self.observe(elapsed);
        res
    }

    /// Observe execution time of a closure, in second.
    #[cfg(feature = "nightly")]
    pub fn observe_closure_duration_coarse<F, T>(&self, f: F) -> T
    where
        F: FnOnce() -> T,
    {
        let instant = Instant::now_coarse();
        let res = f();
        let elapsed = instant.elapsed_sec();
        self.observe(elapsed);
        res
    }

    /// Return a [`LocalHistogram`] for single thread usage.
    pub fn local(&self) -> LocalHistogram {
        LocalHistogram::new(self.clone())
    }

    /// Return accumulated sum of all samples.
    pub fn get_sample_sum(&self) -> f64 {
        self.core.sample_sum()
    }

    /// Return count of all samples.
    pub fn get_sample_count(&self) -> u64 {
        self.core.sample_count()
    }
}

impl Metric for Histogram {
    fn metric(&self) -> proto::Metric {
        let mut m = proto::Metric::default();
        m.set_label(from_vec!(self.core.label_pairs.clone()));

        let h = self.core.proto();
        m.set_histogram(h);

        m
    }
}

impl Collector for Histogram {
    fn desc(&self) -> Vec<&Desc> {
        vec![&self.core.desc]
    }

    fn collect(&self) -> Vec<proto::MetricFamily> {
        let mut m = proto::MetricFamily::default();
        m.set_name(self.core.desc.fq_name.clone());
        m.set_help(self.core.desc.help.clone());
        m.set_field_type(proto::MetricType::HISTOGRAM);
        m.set_metric(from_vec!(vec![self.metric()]));

        vec![m]
    }
}

#[derive(Clone, Debug)]
pub struct HistogramVecBuilder {}

impl MetricVecBuilder for HistogramVecBuilder {
    type M = Histogram;
    type P = HistogramOpts;

    fn build(&self, opts: &HistogramOpts, vals: &[&str]) -> Result<Histogram> {
        Histogram::with_opts_and_label_values(opts, vals)
    }
}

/// A [`Collector`] that bundles a set of Histograms that all share the
/// same [`Desc`], but have different values for their variable labels. This is used
/// if you want to count the same thing partitioned by various dimensions
/// (e.g. HTTP request latencies, partitioned by status code and method).
pub type HistogramVec = MetricVec<HistogramVecBuilder>;

impl HistogramVec {
    /// Create a new [`HistogramVec`] based on the provided
    /// [`HistogramOpts`] and partitioned by the given label names. At least
    /// one label name must be provided.
    pub fn new(opts: HistogramOpts, label_names: &[&str]) -> Result<HistogramVec> {
        let variable_names = label_names.iter().map(|s| (*s).to_owned()).collect();
        let opts = opts.variable_labels(variable_names);
        let metric_vec =
            MetricVec::create(proto::MetricType::HISTOGRAM, HistogramVecBuilder {}, opts)?;

        Ok(metric_vec as HistogramVec)
    }

    /// Return a `LocalHistogramVec` for single thread usage.
    pub fn local(&self) -> LocalHistogramVec {
        let vec = self.clone();
        LocalHistogramVec::new(vec)
    }
}

/// Create `count` buckets, each `width` wide, where the lowest
/// bucket has an upper bound of `start`. The final +Inf bucket is not counted
/// and not included in the returned slice. The returned slice is meant to be
/// used for the Buckets field of [`HistogramOpts`].
///
/// The function returns an error if `count` is zero or `width` is zero or
/// negative.
pub fn linear_buckets(start: f64, width: f64, count: usize) -> Result<Vec<f64>> {
    if count < 1 {
        return Err(Error::Msg(format!(
            "LinearBuckets needs a positive count, count: {}",
            count
        )));
    }
    if width <= 0.0 {
        return Err(Error::Msg(format!(
            "LinearBuckets needs a width greater then 0, width: {}",
            width
        )));
    }

    let buckets: Vec<_> = (0..count)
        .map(|step| start + width * (step as f64))
        .collect();

    Ok(buckets)
}

/// Create `count` buckets, where the lowest bucket has an
/// upper bound of `start` and each following bucket's upper bound is `factor`
/// times the previous bucket's upper bound. The final +Inf bucket is not counted
/// and not included in the returned slice. The returned slice is meant to be
/// used for the Buckets field of [`HistogramOpts`].
///
/// The function returns an error if `count` is zero, if `start` is zero or
/// negative, or if `factor` is less than or equal 1.
pub fn exponential_buckets(start: f64, factor: f64, count: usize) -> Result<Vec<f64>> {
    if count < 1 {
        return Err(Error::Msg(format!(
            "exponential_buckets needs a positive count, count: {}",
            count
        )));
    }
    if start <= 0.0 {
        return Err(Error::Msg(format!(
            "exponential_buckets needs a positive start value, \
             start: {}",
            start
        )));
    }
    if factor <= 1.0 {
        return Err(Error::Msg(format!(
            "exponential_buckets needs a factor greater than 1, \
             factor: {}",
            factor
        )));
    }

    let mut next = start;
    let mut buckets = Vec::with_capacity(count);
    for _ in 0..count {
        buckets.push(next);
        next *= factor;
    }

    Ok(buckets)
}

/// `duration_to_seconds` converts Duration to seconds.
#[inline]
pub fn duration_to_seconds(d: Duration) -> f64 {
    let nanos = f64::from(d.subsec_nanos()) / 1e9;
    d.as_secs() as f64 + nanos
}

#[derive(Clone, Debug)]
pub struct LocalHistogramCore {
    histogram: Histogram,
    counts: Vec<u64>,
    count: u64,
    sum: f64,
}

/// An unsync [`Histogram`].
#[derive(Debug)]
pub struct LocalHistogram {
    core: RefCell<LocalHistogramCore>,
}

impl Clone for LocalHistogram {
    fn clone(&self) -> LocalHistogram {
        let core = self.core.clone();
        let lh = LocalHistogram { core };
        lh.clear();
        lh
    }
}

/// An unsync [`HistogramTimer`].
#[must_use = "Timer should be kept in a variable otherwise it cannot observe duration"]
#[derive(Debug)]
pub struct LocalHistogramTimer {
    /// A local histogram for automatic recording of observations.
    local: LocalHistogram,
    /// Whether the timer has already been observed once.
    observed: bool,
    /// Starting instant for the timer.
    start: Instant,
}

impl LocalHistogramTimer {
    fn new(histogram: LocalHistogram) -> Self {
        Self {
            local: histogram,
            observed: false,
            start: Instant::now(),
        }
    }

    #[cfg(feature = "nightly")]
    fn new_coarse(histogram: LocalHistogram) -> Self {
        Self {
            local: histogram,
            observed: false,
            start: Instant::now_coarse(),
        }
    }

    /// Observe and record timer duration (in seconds).
    ///
    /// It observes the floating-point number of seconds elapsed since the timer
    /// started, and it records that value to the attached histogram.
    pub fn observe_duration(self) {
        self.stop_and_record();
    }

    /// Observe, record and return timer duration (in seconds).
    ///
    /// It observes and returns a floating-point number for seconds elapsed since
    /// the timer started, recording that value to the attached histogram.
    pub fn stop_and_record(self) -> f64 {
        let mut timer = self;
        timer.observe(true)
    }

    /// Observe and return timer duration (in seconds).
    ///
    /// It returns a floating-point number of seconds elapsed since the timer started,
    /// without recording to any histogram.
    pub fn stop_and_discard(self) -> f64 {
        let mut timer = self;
        timer.observe(false)
    }

    fn observe(&mut self, record: bool) -> f64 {
        let v = self.start.elapsed_sec();
        self.observed = true;
        if record {
            self.local.observe(v);
        }
        v
    }
}

impl Drop for LocalHistogramTimer {
    fn drop(&mut self) {
        if !self.observed {
            self.observe(true);
        }
    }
}

impl LocalHistogramCore {
    fn new(histogram: Histogram) -> LocalHistogramCore {
        let counts = vec![0; histogram.core.upper_bounds.len()];

        LocalHistogramCore {
            histogram,
            counts,
            count: 0,
            sum: 0.0,
        }
    }

    pub fn observe(&mut self, v: f64) {
        // Try find the bucket.
        let mut iter = self
            .histogram
            .core
            .upper_bounds
            .iter()
            .enumerate()
            .filter(|&(_, f)| v <= *f);
        if let Some((i, _)) = iter.next() {
            self.counts[i] += 1;
        }

        self.count += 1;
        self.sum += v;
    }

    pub fn clear(&mut self) {
        for v in &mut self.counts {
            *v = 0
        }

        self.count = 0;
        self.sum = 0.0;
    }

    pub fn flush(&mut self) {
        // No cached metric, return.
        if self.count == 0 {
            return;
        }

        {
            // The collect code path uses `self.shard_and_count` and
            // `self.shards[x].count` to ensure not to collect data from a shard
            // while observe calls are still operating on it.
            //
            // To ensure the above, this `inc` needs to use `Acquire` ordering
            // to force anything below this line to stay below it.
            let (shard_index, _count) = self
                .histogram
                .core
                .shard_and_count
                .inc_by(self.count, Ordering::Acquire);
            let shard = &self.histogram.core.shards[shard_index as usize];

            for (i, v) in self.counts.iter().enumerate() {
                if *v > 0 {
                    shard.buckets[i].inc_by(*v);
                }
            }

            shard.sum.inc_by(self.sum);
            // Use `Release` ordering to ensure all operations above stay above.
            shard
                .count
                .inc_by_with_ordering(self.count, Ordering::Release);
        }

        self.clear()
    }

    fn sample_sum(&self) -> f64 {
        self.sum
    }

    fn sample_count(&self) -> u64 {
        self.count
    }
}

impl LocalHistogram {
    fn new(histogram: Histogram) -> LocalHistogram {
        let core = LocalHistogramCore::new(histogram);
        LocalHistogram {
            core: RefCell::new(core),
        }
    }

    /// Add a single observation to the [`Histogram`].
    pub fn observe(&self, v: f64) {
        self.core.borrow_mut().observe(v);
    }

    /// Return a `LocalHistogramTimer` to track a duration.
    pub fn start_timer(&self) -> LocalHistogramTimer {
        LocalHistogramTimer::new(self.clone())
    }

    /// Return a `LocalHistogramTimer` to track a duration.
    /// It is faster but less precise.
    #[cfg(feature = "nightly")]
    pub fn start_coarse_timer(&self) -> LocalHistogramTimer {
        LocalHistogramTimer::new_coarse(self.clone())
    }

    /// Observe execution time of a closure, in second.
    pub fn observe_closure_duration<F, T>(&self, f: F) -> T
    where
        F: FnOnce() -> T,
    {
        let instant = Instant::now();
        let res = f();
        let elapsed = instant.elapsed_sec();
        self.observe(elapsed);
        res
    }

    /// Observe execution time of a closure, in second.
    #[cfg(feature = "nightly")]
    pub fn observe_closure_duration_coarse<F, T>(&self, f: F) -> T
    where
        F: FnOnce() -> T,
    {
        let instant = Instant::now_coarse();
        let res = f();
        let elapsed = instant.elapsed_sec();
        self.observe(elapsed);
        res
    }

    /// Clear the local metric.
    pub fn clear(&self) {
        self.core.borrow_mut().clear();
    }

    /// Flush the local metrics to the [`Histogram`] metric.
    pub fn flush(&self) {
        self.core.borrow_mut().flush();
    }

    /// Return accumulated sum of local samples.
    pub fn get_sample_sum(&self) -> f64 {
        self.core.borrow().sample_sum()
    }

    /// Return count of local samples.
    pub fn get_sample_count(&self) -> u64 {
        self.core.borrow().sample_count()
    }
}

impl LocalMetric for LocalHistogram {
    /// Flush the local metrics to the [`Histogram`] metric.
    fn flush(&self) {
        LocalHistogram::flush(self);
    }
}

impl Drop for LocalHistogram {
    fn drop(&mut self) {
        self.flush()
    }
}

/// An unsync [`HistogramVec`].
#[derive(Debug)]
pub struct LocalHistogramVec {
    vec: HistogramVec,
    local: HashMap<u64, LocalHistogram>,
}

impl LocalHistogramVec {
    fn new(vec: HistogramVec) -> LocalHistogramVec {
        let local = HashMap::with_capacity(vec.v.children.read().len());
        LocalHistogramVec { vec, local }
    }

    /// Get a [`LocalHistogram`] by label values.
    /// See more [`MetricVec::with_label_values`].
    pub fn with_label_values<'a>(&'a mut self, vals: &[&str]) -> &'a LocalHistogram {
        let hash = self.vec.v.hash_label_values(vals).unwrap();
        let vec = &self.vec;
        self.local
            .entry(hash)
            .or_insert_with(|| vec.with_label_values(vals).local())
    }

    /// Remove a [`LocalHistogram`] by label values.
    /// See more [`MetricVec::remove_label_values`].
    pub fn remove_label_values(&mut self, vals: &[&str]) -> Result<()> {
        let hash = self.vec.v.hash_label_values(vals)?;
        self.local.remove(&hash);
        self.vec.v.delete_label_values(vals)
    }

    /// Flush the local metrics to the [`HistogramVec`] metric.
    pub fn flush(&self) {
        for h in self.local.values() {
            h.flush();
        }
    }
}

impl LocalMetric for LocalHistogramVec {
    /// Flush the local metrics to the [`HistogramVec`] metric.
    fn flush(&self) {
        LocalHistogramVec::flush(self)
    }
}

impl Clone for LocalHistogramVec {
    fn clone(&self) -> LocalHistogramVec {
        LocalHistogramVec::new(self.vec.clone())
    }
}

#[cfg(test)]
mod tests {
    use std::f64::{EPSILON, INFINITY};
    use std::thread;
    use std::time::Duration;

    use super::*;
    use crate::metrics::{Collector, Metric};

    #[test]
    fn test_histogram() {
        let opts = HistogramOpts::new("test1", "test help")
            .const_label("a", "1")
            .const_label("b", "2");
        let histogram = Histogram::with_opts(opts).unwrap();
        histogram.observe(1.0);

        let timer = histogram.start_timer();
        thread::sleep(Duration::from_millis(100));
        timer.observe_duration();

        let timer = histogram.start_timer();
        let handler = thread::spawn(move || {
            let _timer = timer;
            thread::sleep(Duration::from_millis(400));
        });
        assert!(handler.join().is_ok());

        let mut mfs = histogram.collect();
        assert_eq!(mfs.len(), 1);

        let mf = mfs.pop().unwrap();
        let m = mf.get_metric().get(0).unwrap();
        assert_eq!(m.get_label().len(), 2);
        let proto_histogram = m.get_histogram();
        assert_eq!(proto_histogram.get_sample_count(), 3);
        assert!(proto_histogram.get_sample_sum() >= 1.5);
        assert_eq!(proto_histogram.get_bucket().len(), DEFAULT_BUCKETS.len());

        let buckets = vec![1.0, 2.0, 3.0];
        let opts = HistogramOpts::new("test2", "test help").buckets(buckets.clone());
        let histogram = Histogram::with_opts(opts).unwrap();
        let mut mfs = histogram.collect();
        assert_eq!(mfs.len(), 1);

        let mf = mfs.pop().unwrap();
        let m = mf.get_metric().get(0).unwrap();
        assert_eq!(m.get_label().len(), 0);
        let proto_histogram = m.get_histogram();
        assert_eq!(proto_histogram.get_sample_count(), 0);
        assert!((proto_histogram.get_sample_sum() - 0.0) < EPSILON);
        assert_eq!(proto_histogram.get_bucket().len(), buckets.len())
    }

    #[test]
    #[cfg(feature = "nightly")]
    fn test_histogram_coarse_timer() {
        let opts = HistogramOpts::new("test1", "test help");
        let histogram = Histogram::with_opts(opts).unwrap();

        let timer = histogram.start_coarse_timer();
        thread::sleep(Duration::from_millis(100));
        timer.observe_duration();

        let timer = histogram.start_coarse_timer();
        let handler = thread::spawn(move || {
            let _timer = timer;
            thread::sleep(Duration::from_millis(400));
        });
        assert!(handler.join().is_ok());

        histogram.observe_closure_duration(|| {
            thread::sleep(Duration::from_millis(400));
        });

        let mut mfs = histogram.collect();
        assert_eq!(mfs.len(), 1);

        let mf = mfs.pop().unwrap();
        let m = mf.get_metric().get(0).unwrap();
        let proto_histogram = m.get_histogram();
        assert_eq!(proto_histogram.get_sample_count(), 3);
        assert!((proto_histogram.get_sample_sum() - 0.0) > EPSILON);
    }

    #[test]
    #[cfg(feature = "nightly")]
    fn test_instant_on_smp() {
        let zero = Duration::from_millis(0);
        for i in 0..100_000 {
            let now = Instant::now();
            let now_coarse = Instant::now_coarse();
            if i % 100 == 0 {
                thread::yield_now();
            }
            assert!(now.elapsed() >= zero);
            assert!(now_coarse.elapsed() >= zero);
        }
    }

    #[test]
    fn test_buckets_invalidation() {
        let table = vec![
            (vec![], true, DEFAULT_BUCKETS.len()),
            (vec![-2.0, -1.0, -0.5, 0.0, 0.5, 1.0, 2.0], true, 7),
            (vec![-2.0, -1.0, -0.5, 10.0, 0.5, 1.0, 2.0], false, 7),
            (vec![-2.0, -1.0, -0.5, 0.0, 0.5, 1.0, INFINITY], true, 6),
        ];

        for (buckets, is_ok, length) in table {
            let got = check_and_adjust_buckets(buckets);
            assert_eq!(got.is_ok(), is_ok);
            if is_ok {
                assert_eq!(got.unwrap().len(), length);
            }
        }
    }

    #[test]
    fn test_buckets_functions() {
        let linear_table = vec![
            (
                -15.0,
                5.0,
                6,
                true,
                vec![-15.0, -10.0, -5.0, 0.0, 5.0, 10.0],
            ),
            (-15.0, 0.0, 6, false, vec![]),
            (-15.0, 5.0, 0, false, vec![]),
        ];

        for (param1, param2, param3, is_ok, vec) in linear_table {
            let got = linear_buckets(param1, param2, param3);
            assert_eq!(got.is_ok(), is_ok);
            if got.is_ok() {
                assert_eq!(got.unwrap(), vec);
            }
        }

        let exponential_table = vec![
            (100.0, 1.2, 3, true, vec![100.0, 120.0, 144.0]),
            (100.0, 0.5, 3, false, vec![]),
            (100.0, 1.2, 0, false, vec![]),
        ];

        for (param1, param2, param3, is_ok, vec) in exponential_table {
            let got = exponential_buckets(param1, param2, param3);
            assert_eq!(got.is_ok(), is_ok);
            if got.is_ok() {
                assert_eq!(got.unwrap(), vec);
            }
        }
    }

    #[test]
    fn test_duration_to_seconds() {
        let tbls = vec![(1000, 1.0), (1100, 1.1), (100_111, 100.111)];
        for (millis, seconds) in tbls {
            let d = Duration::from_millis(millis);
            let v = duration_to_seconds(d);
            assert!((v - seconds).abs() < EPSILON);
        }
    }

    #[test]
    fn test_histogram_vec_with_label_values() {
        let vec = HistogramVec::new(
            HistogramOpts::new("test_histogram_vec", "test histogram vec help"),
            &["l1", "l2"],
        )
        .unwrap();

        assert!(vec.remove_label_values(&["v1", "v2"]).is_err());
        vec.with_label_values(&["v1", "v2"]).observe(1.0);
        assert!(vec.remove_label_values(&["v1", "v2"]).is_ok());

        assert!(vec.remove_label_values(&["v1"]).is_err());
        assert!(vec.remove_label_values(&["v1", "v3"]).is_err());
    }

    #[test]
    fn test_histogram_vec_with_opts_buckets() {
        let labels = ["l1", "l2"];
        let buckets = vec![1.0, 2.0, 3.0];
        let vec = HistogramVec::new(
            HistogramOpts::new("test_histogram_vec", "test histogram vec help")
                .buckets(buckets.clone()),
            &labels,
        )
        .unwrap();

        let histogram = vec.with_label_values(&["v1", "v2"]);
        histogram.observe(1.0);

        let m = histogram.metric();
        assert_eq!(m.get_label().len(), labels.len());

        let proto_histogram = m.get_histogram();
        assert_eq!(proto_histogram.get_sample_count(), 1);
        assert!((proto_histogram.get_sample_sum() - 1.0) < EPSILON);
        assert_eq!(proto_histogram.get_bucket().len(), buckets.len())
    }

    #[test]
    fn test_histogram_local() {
        let buckets = vec![1.0, 2.0, 3.0];
        let opts = HistogramOpts::new("test_histogram_local", "test histogram local help")
            .buckets(buckets.clone());
        let histogram = Histogram::with_opts(opts).unwrap();
        let local = histogram.local();

        let check = |count, sum| {
            let m = histogram.metric();
            let proto_histogram = m.get_histogram();
            assert_eq!(proto_histogram.get_sample_count(), count);
            assert!((proto_histogram.get_sample_sum() - sum) < EPSILON);
        };

        local.observe(1.0);
        local.observe(4.0);
        check(0, 0.0);

        local.flush();
        check(2, 5.0);

        local.observe(2.0);
        local.clear();
        check(2, 5.0);

        local.observe(2.0);
        drop(local);
        check(3, 7.0);
    }

    #[test]
    fn test_histogram_vec_local() {
        let vec = HistogramVec::new(
            HistogramOpts::new("test_histogram_vec_local", "test histogram vec help"),
            &["l1", "l2"],
        )
        .unwrap();
        let mut local_vec = vec.local();

        vec.remove_label_values(&["v1", "v2"]).unwrap_err();
        local_vec.remove_label_values(&["v1", "v2"]).unwrap_err();

        let check = |count, sum| {
            let ms = vec.collect()[0].take_metric();
            let proto_histogram = ms[0].get_histogram();
            assert_eq!(proto_histogram.get_sample_count(), count);
            assert!((proto_histogram.get_sample_sum() - sum) < EPSILON);
        };

        {
            // Flush LocalHistogram
            let h = local_vec.with_label_values(&["v1", "v2"]);
            h.observe(1.0);
            h.flush();
            check(1, 1.0);
        }

        {
            // Flush LocalHistogramVec
            local_vec.with_label_values(&["v1", "v2"]).observe(4.0);
            local_vec.flush();
            check(2, 5.0);
        }
        {
            // Reset ["v1", "v2"]
            local_vec.remove_label_values(&["v1", "v2"]).unwrap();

            // Flush on drop
            local_vec.with_label_values(&["v1", "v2"]).observe(2.0);
            drop(local_vec);
            check(1, 2.0);
        }
    }

    /// Ensure that when an observe and a collect operation interleave, the
    /// latter does not expose a snapshot of the histogram that does not uphold
    /// all histogram invariants.
    #[test]
    fn atomic_observe_across_collects() {
        let done = Arc::new(std::sync::atomic::AtomicBool::default());
        let histogram =
            Histogram::with_opts(HistogramOpts::new("test_name", "test help").buckets(vec![1.0]))
                .unwrap();

        let done_clone = done.clone();
        let histogram_clone = histogram.clone();
        let observing_thread = std::thread::spawn(move || loop {
            if done_clone.load(std::sync::atomic::Ordering::Relaxed) {
                break;
            }

            for _ in 0..1_000_000 {
                histogram_clone.observe(1.0);
            }
        });

        let mut sample_count = 0;
        let mut cumulative_count = 0;
        let mut sample_sum = 0;
        for _ in 0..1_000_000 {
            let metric = &histogram.collect()[0].take_metric()[0];
            let proto = metric.get_histogram();

            sample_count = proto.get_sample_count();
            sample_sum = proto.get_sample_sum() as u64;
            // There is only one bucket thus the `[0]`.
            cumulative_count = proto.get_bucket()[0].get_cumulative_count();

            if sample_count != cumulative_count {
                break;
            }

            // Observation value is always `1.0` thus count and sum should
            // always equal. The number of `observe` calls is limited to
            // 1_000_000, thus the sum is limited to 1_000_000. A float 64 is
            // able to represent the sum accurately up to 9_007_199_254_740_992.
            if sample_count != sample_sum {
                break;
            }
        }

        done.store(true, std::sync::atomic::Ordering::Relaxed);
        observing_thread.join().unwrap();

        if sample_count != cumulative_count {
            panic!(
                "Histogram invariant violated: For a histogram with a single \
                 bucket observing values below the bucket's upper bound only \
                 the histogram's count should always be equal to the buckets's \
                 cumulative count, got {:?} and {:?} instead.",
                sample_count, cumulative_count,
            );
        }

        if sample_count != sample_sum {
            panic!(
                "Histogram invariant violated: For a histogram which is only \
                 ever observing a value of `1.0` the sample count should equal \
                 the sum, instead got: {:?} and {:?}",
                sample_count, sample_sum,
            )
        }
    }

    #[test]
    fn test_error_on_inconsistent_label_cardinality() {
        let hist = Histogram::with_opts(
            histogram_opts!(
                "example_histogram",
                "Used as an example",
                vec![0.005, 0.01, 0.025, 0.05, 0.075, 0.1, 0.25, 0.5, 0.75, 1.0, 5.0]
            )
            .variable_label("example_variable"),
        );

        if let Err(Error::InconsistentCardinality { expect, got }) = hist {
            assert_eq!(1, expect);
            assert_eq!(0, got);
        } else {
            panic!("Expected InconsistentCardinality error.")
        }
    }
}