histogram 0.7.0

Fast and simple atomic histograms
Documentation
// Copyright 2022 Twitter, Inc.
// Licensed under the Apache License, Version 2.0
// http://www.apache.org/licenses/LICENSE-2.0

use crate::*;

use core::sync::atomic::AtomicU32;
use core::sync::atomic::Ordering;

/// A `Histogram` groups recorded values into buckets of similar values and
/// tracks counts for recorded values that fall into those ranges.
#[allow(non_snake_case)]
#[allow(dead_code)]
pub struct Histogram {
    // minimum resolution parameter `M = 2^m`
    m: u32,
    // minimum resolution range parameter `R = 2^r - 1`
    r: u32,
    // maximum value parameter `N = 2^n - 1`
    n: u32,

    // minimum resolution value
    M: u64,
    // minimum resolution upper bound
    R: u64,
    // maximum value
    N: u64,
    // grouping factor
    G: u64,

    // buckets of ranges that hold actual counts
    buckets: Box<[AtomicU32]>,
}

/// A `Builder` allows for constructing a `Histogram` with the desired
/// configuration.
pub struct Builder {
    // minimum resolution parameter `M = 2^m`
    m: u32,
    // minimum resolution range parameter `R = 2^r - 1`
    r: u32,
    // maximum value parameter `N = 2^n - 1`
    n: u32,
}

impl Builder {
    /// Consume the `Builder` and return a `Histogram`.
    pub fn build(self) -> Result<Histogram, Error> {
        Histogram::new(self.m, self.r, self.n)
    }

    /// Sets the width of the smallest bucket in the `Histogram`.
    ///
    /// As the `Histogram` uses base-2 internally, the resolution will be the
    /// largest power of two that is less than or equal to the provided value.
    /// For example, if the minimum resolution is set to 10, the width of the
    /// smallest bucket will be 8.
    pub fn min_resolution(mut self, width: u64) -> Self {
        self.m = 64 - width.leading_zeros();
        self
    }

    /// Sets the maximum value that the minimum resolution extends to.
    ///
    /// This value should be greater than the minimum resolution. If the value
    /// provided is not a power of two, the smallest power of two that is larger
    /// than the provided value will be used.
    pub fn min_resolution_range(mut self, value: u64) -> Self {
        self.r = 64 - value.next_power_of_two().leading_zeros();
        self
    }

    /// Sets the maximum value that can be recorded into the `Histogram`.
    ///
    /// If the value provided is not a power of two, the smallest power of two
    /// that is larger than the provided value will be used.
    pub fn maximum_value(mut self, value: u64) -> Self {
        self.n = 64 - value.next_power_of_two().leading_zeros();
        self
    }
}

impl Histogram {
    /// Construct a new histogram by providing the configuration directly.
    ///
    /// - `m` - sets the minimum resolution `M = 2^m`. This is the smallest unit
    /// of quantification, which is also the smallest bucket width. If the input
    /// values are always integers, choosing `m=0` would ensure precise
    /// recording for the smallest values.
    ///
    /// - `r` - sets the minimum resolution range `R = 2^r - 1`. The selected
    /// value must be greater than the minimum resolution `m`. This sets the
    /// maximum value that the minimum resolution should extend to.
    ///
    /// - `n` - sets the maximum value `N = 2^n - 1`. The selected value must be
    /// greater than or equal to the minimum resolution range `r`.
    ///
    /// # Panics
    /// This will panic if an invalid configuration is specified.
    #[allow(non_snake_case)]
    pub fn new(m: u32, r: u32, n: u32) -> Result<Self, Error> {
        if r <= m || r > n || n > 64 {
            return Err(Error::InvalidConfig);
        }

        let M = 1 << m;
        let R = if r == 64 { u64::MAX } else { (1 << r) - 1 };
        let N = if n == 64 { u64::MAX } else { (1 << n) - 1 };
        let G: u64 = 1 << (r - m - 1);

        let n_buckets = (n - r + 2) as u64 * G;

        let mut buckets = Vec::new();
        buckets.resize_with(n_buckets as usize, || AtomicU32::new(0));

        Ok(Self {
            m,
            r,
            n,
            M,
            R,
            N,
            G,
            buckets: buckets.into_boxed_slice(),
        })
    }

    /// Creates a `Builder` with the default values `m = 0`, `r = 10`, `n = 30`.
    ///
    /// This would create a `Histogram` with 11264 buckets which can store
    /// values from 1 to 1_073_741_823 with values 1 to 1023 being stored in
    /// buckets with a width of 1. Such a `Histogram` would be appropriate for
    /// latencies measured in nanoseconds where the max expected latency is one
    /// second.
    pub fn builder() -> Builder {
        Builder { m: 0, r: 10, n: 30 }
    }

    /// Resets the `Histogram` by zeroing out the count for every bucket.
    pub fn clear(&self) {
        for bucket in self.buckets.iter() {
            bucket.store(0, Ordering::Relaxed);
        }
    }

    /// Increment the histogram bucket corresponding to the provided `value` by
    /// the provided `count`.
    ///
    /// This operation wraps on overflow.
    #[allow(clippy::result_unit_err)]
    pub fn increment(&self, value: u64, count: u32) -> Result<(), Error> {
        if value > self.N {
            // value too big
            return Err(Error::OutOfRange);
        }

        let index = self.bucket_index(value);
        self.buckets[index].fetch_add(count, Ordering::Relaxed);

        Ok(())
    }

    /// Decrement the histogram bucket corresponding to the provided `value` by
    /// the provided `count`.
    ///
    /// This operation wraps on overflow.
    #[allow(clippy::result_unit_err)]
    pub fn decrement(&self, value: u64, count: u32) -> Result<(), Error> {
        if value > self.N {
            // value too big
            return Err(Error::OutOfRange);
        }

        let index = self.bucket_index(value);
        self.buckets[index].fetch_add(count, Ordering::Relaxed);

        Ok(())
    }

    /// Retrieve the `Bucket` which corresponds to the provided percentile.
    ///
    /// An error will be returned if the percentile is invalid or if there are
    /// no samples in the `Histogram`.
    ///
    /// Note: if you are reporting on multiple percentiles, it is more efficient
    /// to use the `percentiles` function to retrieve multiple percentiles in a
    /// single call.
    pub fn percentile(&self, percentile: f64) -> Result<Bucket, Error> {
        if !(0.0..=100.0).contains(&percentile) {
            return Err(Error::InvalidPercentile);
        }

        let total: u64 = self
            .buckets
            .iter()
            .map(|v| v.load(Ordering::Relaxed) as u64)
            .sum();
        if total == 0 {
            return Err(Error::Empty);
        }

        let mut threshold = (percentile * total as f64 / 100.0).ceil() as u64;
        if threshold == 0 {
            threshold += 1;
        }

        let mut seen = 0;
        let mut max = 0;

        for (id, count) in self
            .buckets
            .iter()
            .map(|b| b.load(Ordering::Relaxed) as u64)
            .enumerate()
        {
            if count > 0 {
                max = id;
            }

            seen += count;

            if seen >= threshold {
                return Ok(self.get_bucket(id));
            }
        }

        // if a bucket can't be found for the percentile, return the max bucket
        // seen while walking the histogram. this may be necessary if there is a
        // concurrent modification that reduces the counts before we have a
        // chance to get to that bucket
        Ok(self.get_bucket(max))
    }

    /// Returns a set of percentiles in a single and efficient bulk operation.
    /// Note that the returned percentiles will be sorted from lowest to highest
    /// in the result, even if they do not appear in that order in the provided
    /// set of requested percentiles.
    pub fn percentiles(&self, percentiles: &[f64]) -> Result<Vec<Percentile>, Error> {
        let mut percentiles = percentiles.to_owned();
        percentiles.sort_by(|a, b| a.partial_cmp(b).unwrap());

        for percentile in &percentiles {
            if !(0.0..=100.0).contains(percentile) {
                return Err(Error::InvalidPercentile);
            }
        }

        let total: u64 = self
            .buckets
            .iter()
            .map(|v| v.load(Ordering::Relaxed) as u64)
            .sum();
        if total == 0 {
            return Err(Error::Empty);
        }

        let thresholds: Vec<u64> = percentiles
            .iter()
            .map(|v| std::cmp::min(1, (v * total as f64 / 100.0).ceil() as u64))
            .collect();

        let mut max = 0;
        let mut seen = 0;
        let mut threshold_idx = 0;

        let mut result = Vec::with_capacity(thresholds.len());

        for (bucket_idx, count) in self
            .buckets
            .iter()
            .map(|b| b.load(Ordering::Relaxed) as u64)
            .enumerate()
        {
            if count > 0 {
                max = bucket_idx;
            }

            seen += count;
            while seen > thresholds[threshold_idx] && threshold_idx < thresholds.len() {
                result.push(Percentile {
                    percentile: percentiles[threshold_idx],
                    bucket: self.get_bucket(bucket_idx),
                });

                threshold_idx += 1;
            }

            if threshold_idx >= thresholds.len() {
                break;
            }
        }

        // pad the results with the max bucket seen while walking the histogram
        // this may be necessary if there is a concurrent modification that
        // reduces the counts before we have a chance to get to that bucket
        while result.len() < percentiles.len() {
            // get the index within the percentiles vec
            let idx = percentiles.len() - result.len() - 1;
            result.push(Percentile {
                percentile: percentiles[idx],
                bucket: self.get_bucket(max),
            });
        }

        Ok(result)
    }

    /// Merges counts from the other `Histogram` into this `Histogram`. Returns
    /// an error if there are differences in the configurations of both
    /// `Histogram`s.
    #[allow(clippy::result_unit_err)]
    pub fn merge(&self, other: &Self) -> Result<(), Error> {
        // make sure they match
        if self.m != other.m || self.r != other.r || self.n != other.n {
            return Err(Error::IncompatibleHistogram);
        }

        for (idx, value) in other
            .buckets
            .iter()
            .map(|v| v.load(Ordering::Relaxed))
            .enumerate()
        {
            self.buckets[idx].fetch_add(value, Ordering::Relaxed);
        }

        Ok(())
    }

    /// Subtracts the other `Histogram` from this `Histogram`. Returns an error
    /// if there are differences in the configurations of both `Histogram`s.
    pub fn subtract(&self, other: &Self) -> Result<(), Error> {
        // make sure they match
        if self.m != other.m || self.r != other.r || self.n != other.n {
            return Err(Error::IncompatibleHistogram);
        }

        for (idx, value) in other
            .buckets
            .iter()
            .map(|v| v.load(Ordering::Relaxed))
            .enumerate()
        {
            self.buckets[idx].fetch_sub(value, Ordering::Relaxed);
        }

        Ok(())
    }

    pub fn buckets(&self) -> usize {
        self.buckets.len()
    }

    fn low(&self, idx: usize) -> u64 {
        let idx = idx as u64;
        let m = self.m as u64;
        let r = self.r as u64;
        let g = idx >> (self.r - self.m - 1);
        let b = idx - g * self.G;

        if g < 1 {
            (1 << m) * b
        } else {
            (1 << (r + g - 2)) + (1 << (m + g - 1)) * b
        }
    }

    fn high(&self, idx: usize) -> u64 {
        let idx = idx as u64;
        let m = self.m as u64;
        let r = self.r as u64;
        let g = idx >> (self.r - self.m - 1);
        let b = idx - g * self.G + 1;

        if g < 1 {
            (1 << m) * b - 1
        } else {
            (1 << (r + g - 2)) + (1 << (m + g - 1)) * b - 1
        }
    }

    fn get_bucket(&self, idx: usize) -> Bucket {
        let low = self.low(idx);
        let high = self.high(idx);

        Bucket {
            low,
            high,
            count: self.buckets[idx].load(Ordering::Relaxed),
        }
    }

    fn bucket_index(&self, value: u64) -> usize {
        if value == 0 {
            return 0;
        }

        let m = self.m as u64;
        let r = self.r as u64;

        let h = (63 - value.leading_zeros()) as u64;

        if h < r {
            (value >> m) as usize
        } else {
            let d = h - r + 1;
            ((d + 1) * self.G + ((value - (1 << h)) >> (m + d))) as usize
        }
    }
}

impl Clone for Histogram {
    fn clone(&self) -> Self {
        // SAFETY: unwrap is safe because we already have a histogram with these
        // values for the parameters
        let ret = Histogram::new(self.m as u32, self.r as u32, self.n as u32).unwrap();
        for (id, value) in self
            .buckets
            .iter()
            .map(|v| v.load(Ordering::Relaxed))
            .enumerate()
        {
            ret.buckets[id].store(value, Ordering::Relaxed)
        }
        ret
    }
}

/// An iterator that allows walking through the `Bucket`s within a `Histogram`.
pub struct HistogramIter<'a> {
    current: usize,
    histogram: &'a Histogram,
}

impl<'a> IntoIterator for &'a Histogram {
    type Item = Bucket;
    type IntoIter = HistogramIter<'a>;

    fn into_iter(self) -> Self::IntoIter {
        HistogramIter {
            current: 0,
            histogram: self,
        }
    }
}

impl<'a> Iterator for HistogramIter<'a> {
    type Item = Bucket;

    fn next(&mut self) -> Option<Bucket> {
        if self.current < self.histogram.buckets.len() {
            let bucket = self.histogram.get_bucket(self.current);
            self.current += 1;
            Some(bucket)
        } else {
            None
        }
    }
}