use std::convert::From;
use std::sync::{Arc, RwLock};
use std::collections::HashMap;
use std::time::{Instant, Duration};
use protobuf::RepeatedField;
use proto;
use desc::Desc;
use errors::{Result, Error};
use value::make_label_pairs;
use vec::{MetricVec, MetricVecBuilder};
use metrics::{Collector, Metric, Opts};
pub const DEFAULT_BUCKETS: &'static [f64; 11] = &[0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0,
2.5, 5.0, 10.0];
pub const BUCKET_LABEL: &'static str = "le";
#[inline]
fn check_bucket_lable(label: &str) -> Result<()> {
if label == BUCKET_LABEL {
return Err(Error::Msg("`le` is not allowed as label name in histograms".to_owned()));
}
Ok(())
}
pub fn check_and_adjust_buckets(mut buckets: Vec<f64>) -> Result<Vec<f64>> {
if buckets.is_empty() {
buckets = Vec::from(DEFAULT_BUCKETS as &'static [f64]);
}
for (i, upper_bound) in buckets.iter().enumerate() {
if i < (buckets.len() - 1) && *upper_bound >= buckets[i + 1] {
return Err(Error::Msg(format!("histogram buckets must be in increasing \
order: {} >= {}",
upper_bound,
buckets[i + 1])));
}
}
let tail = *buckets.last().unwrap();
if tail.is_sign_positive() && tail.is_infinite() {
buckets.pop();
}
Ok(buckets)
}
pub struct HistogramOpts {
pub common_opts: Opts,
pub buckets: Vec<f64>,
}
impl HistogramOpts {
pub fn new<S: Into<String>>(name: S, help: S) -> HistogramOpts {
HistogramOpts {
common_opts: Opts::new(name, help),
buckets: Vec::from(DEFAULT_BUCKETS as &'static [f64]),
}
}
pub fn namespace<S: Into<String>>(mut self, namesapce: S) -> Self {
self.common_opts.namespace = namesapce.into();
self
}
pub fn subsystem<S: Into<String>>(mut self, subsystem: S) -> Self {
self.common_opts.subsystem = subsystem.into();
self
}
pub fn const_labels(mut self, labels: HashMap<String, String>) -> Self {
self.common_opts = self.common_opts.const_labels(labels);
self
}
pub fn const_label<S: Into<String>>(mut self, name: S, value: S) -> Self {
self.common_opts = self.common_opts.const_label(name, value);
self
}
pub fn fq_name(&self) -> String {
self.common_opts.fq_name()
}
pub fn buckets(mut self, buckets: Vec<f64>) -> Self {
self.buckets = buckets;
self
}
}
impl From<Opts> for HistogramOpts {
fn from(opts: Opts) -> HistogramOpts {
HistogramOpts {
common_opts: opts,
buckets: Vec::from(DEFAULT_BUCKETS as &'static [f64]),
}
}
}
#[derive(Debug)]
struct HistogramCore {
sum: f64,
count: u64,
upper_bounds: Vec<f64>,
counts: Vec<u64>,
}
impl HistogramCore {
fn with_buckets(buckets: Vec<f64>) -> Result<HistogramCore> {
let buckets = try!(check_and_adjust_buckets(buckets));
Ok(HistogramCore {
sum: 0.0,
count: 0,
counts: vec![0; buckets.len()],
upper_bounds: buckets,
})
}
fn observe(&mut self, v: f64) {
let mut iter = self.upper_bounds.iter().enumerate().filter(|&(_, f)| v <= *f);
if let Some((i, _)) = iter.next() {
self.counts[i] += 1;
}
self.count += 1;
self.sum += v;
}
fn proto(&self) -> proto::Histogram {
let mut h = proto::Histogram::new();
h.set_sample_sum(self.sum);
h.set_sample_count(self.count);
let mut count = 0;
let mut buckets = Vec::with_capacity(self.upper_bounds.len());
for (i, upper_bound) in self.upper_bounds.iter().enumerate() {
count += self.counts[i];
let mut b = proto::Bucket::new();
b.set_cumulative_count(count);
b.set_upper_bound(*upper_bound);
buckets.push(b);
}
h.set_bucket(RepeatedField::from_vec(buckets));
h
}
}
impl Default for HistogramCore {
fn default() -> HistogramCore {
HistogramCore::with_buckets(vec![]).unwrap()
}
}
pub struct HistogramTimer<'a> {
histogram: &'a Histogram,
start: Instant,
}
impl<'a> HistogramTimer<'a> {
fn new(histogram: &'a Histogram) -> HistogramTimer {
HistogramTimer {
histogram: histogram,
start: Instant::now(),
}
}
pub fn observe_duration(self) {
drop(self);
}
fn observe(&mut self) {
let v = duration_to_seconds(self.start.elapsed());
self.histogram.observe(v)
}
}
impl<'a> Drop for HistogramTimer<'a> {
fn drop(&mut self) {
self.observe();
}
}
#[derive(Clone)]
pub struct Histogram {
desc: Desc,
label_pairs: Vec<proto::LabelPair>,
core: Arc<RwLock<HistogramCore>>,
}
impl Histogram {
pub fn with_opts(opts: HistogramOpts) -> Result<Histogram> {
let desc = try!(Desc::new(opts.fq_name(),
opts.common_opts.help.clone(),
vec![],
opts.common_opts.const_labels.clone()));
Histogram::with_desc_and_buckets(desc, &[], Some(opts.buckets))
}
fn with_desc(desc: Desc, label_values: &[&str]) -> Result<Histogram> {
Histogram::with_desc_and_buckets(desc, label_values, None)
}
fn with_desc_and_buckets(desc: Desc,
label_values: &[&str],
buckets: Option<Vec<f64>>)
-> Result<Histogram> {
for name in &desc.variable_labels {
try!(check_bucket_lable(&name));
}
for pair in &desc.const_label_pairs {
try!(check_bucket_lable(pair.get_name()));
}
let pairs = make_label_pairs(&desc, label_values);
let core = try!(buckets.map_or(Ok(HistogramCore::default()), HistogramCore::with_buckets));
Ok(Histogram {
desc: desc,
label_pairs: pairs,
core: Arc::new(RwLock::new(core)),
})
}
}
impl Histogram {
pub fn observe(&self, v: f64) {
self.core.write().unwrap().observe(v)
}
pub fn start_timer(&self) -> HistogramTimer {
HistogramTimer::new(self)
}
}
impl Metric for Histogram {
fn metric(&self) -> proto::Metric {
let mut m = proto::Metric::new();
m.set_label(RepeatedField::from_vec(self.label_pairs.clone()));
let core = self.core.read().unwrap();
let h = core.proto();
m.set_histogram(h);
m
}
}
impl Collector for Histogram {
fn desc(&self) -> &Desc {
&self.desc
}
fn collect(&self) -> proto::MetricFamily {
let mut m = proto::MetricFamily::new();
m.set_name(self.desc.fq_name.clone());
m.set_help(self.desc.help.clone());
m.set_field_type(proto::MetricType::HISTOGRAM);
m.set_metric(RepeatedField::from_vec(vec![self.metric()]));
m
}
}
#[derive(Clone)]
pub struct HistogramVecBuilder {}
impl MetricVecBuilder for HistogramVecBuilder {
type Output = Histogram;
fn build(&self, desc: &Desc, vals: &[&str]) -> Result<Histogram> {
Histogram::with_desc(desc.clone(), vals)
}
}
pub type HistogramVec = MetricVec<HistogramVecBuilder>;
impl HistogramVec {
pub fn new(opts: HistogramOpts, label_names: &[&str]) -> Result<HistogramVec> {
let variable_names = label_names.iter().map(|s| (*s).to_owned()).collect();
let desc = try!(Desc::new(opts.fq_name(),
opts.common_opts.help,
variable_names,
opts.common_opts.const_labels));
let metric_vec =
MetricVec::create(desc, proto::MetricType::HISTOGRAM, HistogramVecBuilder {});
Ok(metric_vec as HistogramVec)
}
}
pub fn linear_buckets(start: f64, width: f64, count: usize) -> Result<Vec<f64>> {
if count < 1 {
return Err(Error::Msg(format!("LinearBuckets needs a positive count, count: {}", count)));
}
if width <= 0.0 {
return Err(Error::Msg(format!("LinearBuckets needs a width greater then 0, width: {}",
width)));
}
let mut next = start;
let mut buckets = Vec::with_capacity(count);
for _ in 0..count {
buckets.push(next);
next += width;
}
Ok(buckets)
}
pub fn exponential_buckets(start: f64, factor: f64, count: usize) -> Result<Vec<f64>> {
if count < 1 {
return Err(Error::Msg(format!("exponential_buckets needs a positive count, count: {}",
count)));
}
if start <= 0.0 {
return Err(Error::Msg(format!("exponential_buckets needs a positive start value, \
start: {}",
start)));
}
if factor <= 1.0 {
return Err(Error::Msg(format!("exponential_buckets needs a factor greater than 1, \
factor: {}",
factor)));
}
let mut next = start;
let mut buckets = Vec::with_capacity(count);
for _ in 0..count {
buckets.push(next);
next *= factor;
}
Ok(buckets)
}
pub fn duration_to_seconds(d: Duration) -> f64 {
let nanos = d.subsec_nanos() as f64 / 1e9;
d.as_secs() as f64 + nanos
}
#[cfg(test)]
mod tests {
use std::thread;
use std::f64::{EPSILON, INFINITY};
use std::time::Duration;
use metrics::Collector;
use super::*;
#[test]
fn test_histogram() {
let opts = HistogramOpts::new("test1", "test help")
.const_label("a", "1")
.const_label("b", "2");
let histogram = Histogram::with_opts(opts).unwrap();
histogram.observe(1.0);
let timer = histogram.start_timer();
thread::sleep(Duration::from_millis(100));
timer.observe_duration();
{
let _timer = histogram.start_timer();
thread::sleep(Duration::from_millis(400));
}
let mf = histogram.collect();
let m = mf.get_metric().as_ref().get(0).unwrap();
assert_eq!(m.get_label().len(), 2);
let proto_histogram = m.get_histogram();
assert_eq!(proto_histogram.get_sample_count(), 3);
assert!(proto_histogram.get_sample_sum() >= 1.5);
assert_eq!(proto_histogram.get_bucket().len(), DEFAULT_BUCKETS.len());
let buckets = vec![1.0, 2.0, 3.0];
let opts = HistogramOpts::new("test2", "test help").buckets(buckets.clone());
let histogram = Histogram::with_opts(opts).unwrap();
let mf = histogram.collect();
let m = mf.get_metric().as_ref().get(0).unwrap();
assert_eq!(m.get_label().len(), 0);
let proto_histogram = m.get_histogram();
assert_eq!(proto_histogram.get_sample_count(), 0);
assert!((proto_histogram.get_sample_sum() - 0.0) < EPSILON);
assert_eq!(proto_histogram.get_bucket().len(), buckets.len())
}
#[test]
fn test_buckets_invalidation() {
let table = vec![
(vec![], true, DEFAULT_BUCKETS.len()),
(vec![-2.0, -1.0, -0.5, 0.0, 0.5, 1.0, 2.0], true, 7),
(vec![-2.0, -1.0, -0.5, 10.0, 0.5, 1.0, 2.0], false, 7),
(vec![-2.0, -1.0, -0.5, 0.0, 0.5, 1.0, INFINITY], true, 6),
];
for (buckets, is_ok, length) in table {
let got = check_and_adjust_buckets(buckets);
assert_eq!(got.is_ok(), is_ok);
if is_ok {
assert_eq!(got.unwrap().len(), length);
}
}
}
#[test]
fn test_buckets_functions() {
let linear_table = vec![
(-15.0, 5.0, 6, true, vec![-15.0, -10.0, -5.0, 0.0, 5.0, 10.0]),
(-15.0, 0.0, 6, false, vec![]),
(-15.0, 5.0, 0, false, vec![]),
];
for (param1, param2, param3, is_ok, vec) in linear_table {
let got = linear_buckets(param1, param2, param3);
assert_eq!(got.is_ok(), is_ok);
if got.is_ok() {
assert_eq!(got.unwrap(), vec);
}
}
let exponential_table = vec![
(100.0, 1.2, 3, true, vec![100.0, 120.0, 144.0]),
(100.0, 0.5, 3, false, vec![]),
(100.0, 1.2, 0, false, vec![]),
];
for (param1, param2, param3, is_ok, vec) in exponential_table {
let got = exponential_buckets(param1, param2, param3);
assert_eq!(got.is_ok(), is_ok);
if got.is_ok() {
assert_eq!(got.unwrap(), vec);
}
}
}
#[test]
fn test_duration_to_seconds() {
let tbls = vec![(1000, 1.0), (1100, 1.1), (100111, 100.111)];
for (millis, seconds) in tbls {
let d = Duration::from_millis(millis);
let v = duration_to_seconds(d);
assert!((v - seconds).abs() < EPSILON);
}
}
}