use actix::prelude::*;
use config;
use histogram::Histogram;
use serde_derive::{Deserialize, Serialize};
use serde_json as json;
use std::cell::RefCell;
use std::collections::hash_map::DefaultHasher;
use std::collections::BTreeMap;
use std::fs;
use std::hash::{Hash, Hasher};
use std::time::Instant;
use crate::common::logger::*;
use crate::service::config::MetricConfig;
pub mod backend;
static TARGET_NAME: &'static str = "";
static TARGET_METRIC: &'static str = "metric";
static TEMPEST_NAME: &'static str = "tempest";
static HISTOGRAM_BUCKET: &'static str = "bucket";
static HISTOGRAM_LE: &'static str = "le";
thread_local! {
static ROOT: RefCell<Root> = RefCell::new(Root::default());
}
fn string_vec(v: Vec<&str>) -> Vec<String> {
v.iter().map(|s| s.to_string()).collect()
}
fn string_label_vec(v: Vec<(&str, &str)>) -> Vec<(String, String)> {
v.iter()
.map(|(a, b)| (a.to_string(), b.to_string()))
.collect()
}
pub(crate) fn configure(flush_interval: Option<u64>, targets: Option<Vec<MetricTarget>>) {
if let Some(ms) = flush_interval {
Root::flush_interval(ms);
}
if let Some(_targets) = targets {
Root::targets(_targets);
}
}
pub(crate) fn parse_metrics_config(cfgs: MetricConfig) {
let mut targets = vec![];
for target in cfgs.target.iter() {
match parse_config_value(target.to_owned()) {
Some(t) => {
trace!(target: TARGET_METRIC, "Configure metric {:?} target", &t);
targets.push(t);
}
None => {}
}
}
configure(cfgs.flush_interval, Some(targets));
}
fn parse_config_value(cfg: config::Value) -> Option<MetricTarget> {
match cfg.try_into::<MetricTarget>() {
Ok(target) => Some(target),
Err(err) => {
error!(target: TARGET_METRIC, "Error {:?}", err);
None
}
}
}
#[derive(Clone, Debug, PartialEq, Deserialize)]
#[serde(tag = "type", rename_all = "lowercase")]
pub enum MetricTarget {
Console { prefix: Option<String> },
Statsd {
host: String,
prefix: Option<String>,
},
Prometheus { uri: String, prefix: Option<String> },
File {
path: String,
clobber: Option<bool>,
prefix: Option<String>,
},
Log {
level: Option<MetricLogLevel>,
prefix: Option<String>,
},
}
impl MetricTarget {
pub fn console(prefix: Option<String>) -> Self {
Self::Console { prefix: prefix }
}
pub fn statsd(host: String, prefix: Option<String>) -> Self {
Self::Statsd {
host: host,
prefix: prefix,
}
}
pub fn prometheus(uri: String, prefix: Option<String>) -> Self {
Self::Prometheus {
uri: uri,
prefix: prefix,
}
}
pub fn file(path: String, clobber: Option<bool>, prefix: Option<String>) -> Self {
Self::File {
path: path,
clobber: clobber,
prefix: prefix,
}
}
pub fn log(level: Option<MetricLogLevel>, prefix: Option<String>) -> Self {
Self::Log {
level: level,
prefix: prefix,
}
}
}
#[derive(Clone, Debug, PartialEq, Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum MetricLogLevel {
Error,
Warn,
Info,
Debug,
Trace,
}
impl MetricLogLevel {
fn to_level(&self) -> log::Level {
match self {
MetricLogLevel::Error => log::Level::Error,
MetricLogLevel::Warn => log::Level::Warn,
MetricLogLevel::Info => log::Level::Info,
MetricLogLevel::Debug => log::Level::Debug,
MetricLogLevel::Trace => log::Level::Trace,
}
}
}
#[derive(Clone)]
pub(crate) struct Root {
pub target_name: String,
pub prefix: String,
pub labels: Labels,
pub targets: Vec<MetricTarget>,
pub flush_interval: u64,
}
impl Default for Root {
fn default() -> Self {
Root {
target_name: TARGET_NAME.into(),
prefix: TEMPEST_NAME.into(),
labels: None,
targets: vec![],
flush_interval: 5000,
}
}
}
impl Root {
pub fn labels(labels: Vec<Label>) {
ROOT.with(|root| {
for label in labels {
Root::add_label(&mut *root.borrow_mut(), label.0, label.1);
}
});
}
#[allow(dead_code)]
pub fn label(key: String, value: String) {
ROOT.with(|root| {
Root::add_label(&mut *root.borrow_mut(), key, value);
});
}
pub fn target_name(name: String) {
ROOT.with(|root| {
let mut r = root.borrow_mut();
r.target_name = name;
});
}
pub fn targets(targets: Vec<MetricTarget>) {
ROOT.with(|root| {
for target in targets {
Root::add_target(&mut *root.borrow_mut(), target);
}
});
}
pub fn flush_interval(value: u64) {
ROOT.with(|root| {
let mut r = root.borrow_mut();
r.flush_interval = value;
});
}
fn get_prefix() -> String {
ROOT.with(|root| root.borrow().prefix.clone())
}
fn get_labels() -> Labels {
ROOT.with(|root| root.borrow().labels.clone())
}
fn get_target_name() -> String {
ROOT.with(|root| root.borrow().target_name.clone())
}
fn get_targets() -> Vec<MetricTarget> {
ROOT.with(|root| root.borrow().targets.clone())
}
pub fn get_targets_len() -> usize {
ROOT.with(|root| root.borrow().targets.len())
}
fn get_flush_interval() -> u64 {
ROOT.with(|root| root.borrow().flush_interval)
}
fn add_label(&mut self, key: String, value: String) {
if self.labels.is_none() {
self.labels = Some(vec![]);
}
let mut labels = self.labels.clone().unwrap();
labels.push((key, value));
self.labels = Some(labels);
}
fn add_target(&mut self, target: MetricTarget) {
self.targets.push(target);
}
}
#[derive(Debug)]
pub struct TestMetrics {
aggregate: AggregateMetrics,
}
impl TestMetrics {
pub fn new(aggregate: AggregateMetrics) -> Self {
Self { aggregate }
}
pub fn get(&self, key: &str) -> Option<&isize> {
self.aggregate.get(key)
}
}
#[derive(Serialize, Deserialize, Debug, Message, Default, Clone)]
pub struct AggregateMetrics {
pub counters: BTreeMap<String, isize>,
}
static TMP_AGGREGATE_METRICS: &'static str = "/tmp/aggregate-metrics-agent";
impl AggregateMetrics {
fn get_file_name(suffix: &String) -> String {
format!("{}-{}", TMP_AGGREGATE_METRICS, suffix)
}
pub fn read_tmp(suffix: &String) -> Self {
let filename = AggregateMetrics::get_file_name(suffix);
let aggregate_metrics = match fs::read(&filename.as_str()) {
Ok(buf) => match json::from_slice::<AggregateMetrics>(&buf) {
Ok(aggregate) => aggregate,
Err(_err) => AggregateMetrics::default(),
},
Err(err) => {
error!(
"Error reading aggregate metrics file: {:?} {:?}",
TMP_AGGREGATE_METRICS, &err,
);
AggregateMetrics::default()
}
};
let _ = fs::remove_file(&filename.as_str());
aggregate_metrics
}
pub fn write_tmp(&self, suffix: &String) {
let filename = AggregateMetrics::get_file_name(suffix);
let body = json::to_string(&self).unwrap();
match fs::write(&filename.as_str(), body) {
Err(err) => {
error!("Error writing aggregate metrics: {:?}", &err);
}
_ => {}
}
}
pub fn insert(&mut self, key: String, value: isize) {
self.counters.insert(key, value);
}
pub fn get(&self, key: &str) -> Option<&isize> {
self.counters.get(key)
}
}
#[derive(Clone)]
pub struct Metrics {
pub names: Vec<String>,
pub labels: Labels,
pub bucket: Bucket,
}
impl Default for Metrics {
fn default() -> Self {
Metrics::new(vec![])
}
}
impl Metrics {
pub fn new(names: Vec<&str>) -> Self {
Metrics {
names: string_vec(names),
labels: None,
bucket: Bucket::default(),
}
}
pub fn named(&mut self, names: Vec<&str>) -> Self {
self.names = string_vec(names);
self.clone()
}
pub fn labels(&mut self, labels: Vec<(&str, &str)>) -> Self {
for label in labels {
self.add_label(label.0, label.1);
}
self.clone()
}
pub fn add_label(&mut self, key: &str, value: &str) {
if self.labels.is_none() {
self.labels = Some(vec![]);
}
let mut labels = self.labels.clone().unwrap();
labels.push((key.to_string(), value.to_string()));
self.labels = Some(labels);
}
pub fn flush(&mut self) {
if self.bucket.map.len() == 0 {
trace!("Empty metrics bucket. Skip flush: {:?}", &self.names);
return;
}
let metrics = self.clone();
self.bucket.clear();
let backend_metrics = backend::MetricsBackendActor::from_registry();
if backend_metrics.connected() {
backend_metrics.do_send(backend::Msg {
root_prefix: Root::get_prefix(),
root_labels: Root::get_labels(),
metrics: metrics,
});
}
}
pub fn counter(&mut self, names: Vec<&str>, value: isize) {
self.bucket.metric(names, MetricKind::Counter).add(value);
}
pub fn incr(&mut self, names: Vec<&str>) {
self.bucket.metric(names, MetricKind::Counter).add(1);
}
pub fn decr(&mut self, names: Vec<&str>) {
self.bucket.metric(names, MetricKind::Counter).add(-1);
}
pub fn gauge(&mut self, names: Vec<&str>, value: isize) {
self.bucket.metric(names, MetricKind::Gauge).set(value);
}
pub fn counter_labels(&mut self, names: Vec<&str>, value: isize, labels: Vec<(&str, &str)>) {
self.bucket
.metric_labels(names, labels, MetricKind::Counter)
.add(value);
}
pub fn incr_labels(&mut self, names: Vec<&str>, labels: Vec<(&str, &str)>) {
self.bucket
.metric_labels(names, labels, MetricKind::Counter)
.add(1);
}
pub fn decr_labels(&mut self, names: Vec<&str>, labels: Vec<(&str, &str)>) {
self.bucket
.metric_labels(names, labels, MetricKind::Counter)
.add(-1);
}
pub fn gauge_labels(&mut self, names: Vec<&str>, value: isize, labels: Vec<(&str, &str)>) {
self.bucket
.metric_labels(names, labels, MetricKind::Gauge)
.set(value);
}
pub fn timer(&mut self) -> SimpleTimer {
SimpleTimer::default()
}
pub fn time(&mut self, names: Vec<&str>, mut timer: SimpleTimer) {
timer.stop();
self.bucket.metric(names, MetricKind::Timer).timer(timer);
}
pub fn time_labels(
&mut self,
names: Vec<&str>,
mut timer: SimpleTimer,
labels: Vec<(&str, &str)>,
) {
timer.stop();
self.bucket
.metric_labels(names, labels, MetricKind::Timer)
.timer(timer);
}
}
#[derive(Default, Clone)]
pub struct Bucket {
map: BTreeMap<MetricId, Metric>,
}
impl Bucket {
fn metric(&mut self, names: Vec<&str>, kind: MetricKind) -> &mut Metric {
let id = Metric::to_hash(&names, &None);
if !self.map.contains_key(&id) {
let metric = Metric::new(id, names, kind);
self.map.insert(id, metric);
}
self.map.get_mut(&id).unwrap()
}
fn metric_labels(
&mut self,
names: Vec<&str>,
labels: Vec<(&str, &str)>,
kind: MetricKind,
) -> &mut Metric {
let id = Metric::to_hash(&names, &Some(&labels));
if !self.map.contains_key(&id) {
let metric = Metric::new_labels(id, names, labels, kind);
self.map.insert(id, metric);
}
self.map.get_mut(&id).unwrap()
}
fn clear(&mut self) {
self.map.clear();
}
}
#[derive(Clone)]
struct Metric {
id: u64,
names: Vec<String>,
labels: Labels,
kind: MetricKind,
value: MetricValue,
}
impl Metric {
fn default_value(kind: &MetricKind) -> MetricValue {
match kind {
MetricKind::Counter => MetricValue::Counter(Counter::default()),
MetricKind::Gauge => MetricValue::Gauge(0),
MetricKind::Timer => MetricValue::Timer(Timer::default()),
}
}
fn new(id: u64, names: Vec<&str>, kind: MetricKind) -> Self {
Metric {
id: id,
names: string_vec(names),
labels: None,
value: Metric::default_value(&kind),
kind: kind,
}
}
fn new_labels(id: u64, names: Vec<&str>, labels: Vec<(&str, &str)>, kind: MetricKind) -> Self {
Metric {
id: id,
names: string_vec(names),
labels: Some(string_label_vec(labels)),
value: Metric::default_value(&kind),
kind: kind,
}
}
fn to_value(&mut self, format: MetricFormat) -> FormatedMetric {
match &self.value {
MetricValue::Counter(counter) => {
let _v = counter.value.to_string();
match format {
MetricFormat::Standard => FormatedMetric::Standard(_v),
MetricFormat::Statsd => FormatedMetric::Statsd(_v),
MetricFormat::Prometheus => FormatedMetric::Prometheus(vec![(None, None, _v)]),
}
}
MetricValue::Gauge(v) => {
let _v = v.to_string();
match format {
MetricFormat::Standard => FormatedMetric::Standard(_v),
MetricFormat::Statsd => FormatedMetric::Statsd(_v),
MetricFormat::Prometheus => FormatedMetric::Prometheus(vec![(None, None, _v)]),
}
}
MetricValue::Timer(timer) => {
match format {
MetricFormat::Standard => {
FormatedMetric::Standard(timer.simple.value.to_string())
}
MetricFormat::Statsd => {
FormatedMetric::Statsd(timer.simple.value_as_ms().to_string())
}
MetricFormat::Prometheus => {
let mut values = vec![];
let percentiles = vec![
0.05f64, 0.1f64, 0.25f64, 0.5f64, 0.75f64, 0.90f64, 0.95f64, 0.99f64,
];
let labels = vec![
"0.05", "0.1", "0.2", "0.25", "0.5", "0.75", "0.9", "0.95", "0.99",
];
for (i, p) in percentiles.iter().enumerate() {
match timer.histogram.percentile(*p) {
Ok(v) => {
values.push((
Some(HISTOGRAM_BUCKET),
Some((HISTOGRAM_LE, labels[i])),
v.to_string(),
));
}
Err(err) => {
error!(
"Error packing prometheus histogram values for key: {:?}",
&err
);
}
}
}
let count = timer.simple.counter.value.to_string();
values.push((
Some(HISTOGRAM_BUCKET),
Some((HISTOGRAM_LE, "+Inf")),
count.clone(),
));
values.push((Some("sum"), None, timer.simple.value.to_string()));
values.push((Some("count"), None, count));
FormatedMetric::Prometheus(values)
}
}
}
}
}
fn to_hash(names: &Vec<&str>, labels: &Option<&Vec<(&str, &str)>>) -> u64 {
let mut s = DefaultHasher::new();
for name in names.iter() {
name.hash(&mut s);
}
if let Some(values) = labels {
for label in values.iter() {
label.0.hash(&mut s);
label.1.hash(&mut s);
}
}
s.finish()
}
fn add(&mut self, value: isize) {
match &self.value {
MetricValue::Counter(mut counter) => {
counter.add(value);
self.value = MetricValue::Counter(counter);
}
_ => {}
}
}
fn set(&mut self, value: isize) {
match self.value {
MetricValue::Gauge(_) => {
self.value = MetricValue::Gauge(value);
}
_ => {}
}
}
fn timer(&mut self, simple: SimpleTimer) {
match &self.value {
MetricValue::Timer(timer) => {
let mut new_timer = timer.clone();
new_timer.incr(simple);
self.value = MetricValue::Timer(new_timer);
}
_ => {}
}
}
}
type MetricId = u64;
pub(crate) type Label = (String, String);
pub(crate) type Labels = Option<Vec<Label>>;
#[derive(Clone)]
pub(crate) enum MetricKind {
Counter,
Gauge,
Timer,
}
impl MetricKind {
fn as_str(&self) -> &'static str {
match self {
MetricKind::Counter => "Counter",
MetricKind::Gauge => "Gauge",
MetricKind::Timer => "Timer",
}
}
fn as_prom_str(&self) -> &'static str {
match self {
MetricKind::Counter => "counter",
MetricKind::Gauge => "gauge",
MetricKind::Timer => "histogram",
}
}
fn as_statsd_str(&self) -> &'static str {
match self {
MetricKind::Counter => "c",
MetricKind::Gauge => "g",
MetricKind::Timer => "ms",
}
}
}
pub(crate) enum MetricFormat {
Standard,
Prometheus,
Statsd,
}
pub(crate) enum FormatedMetric {
Standard(String),
Prometheus(
Vec<(
Option<&'static str>,
Option<(&'static str, &'static str)>,
String,
)>,
),
Statsd(String),
}
#[derive(Clone)]
pub(crate) enum MetricValue {
Counter(Counter),
Gauge(isize),
Timer(Timer),
}
#[derive(Debug, Clone, Copy)]
pub(crate) struct Counter {
pub value: isize,
}
impl Default for Counter {
fn default() -> Self {
Self { value: 0 }
}
}
impl Counter {
fn add(&mut self, value: isize) {
self.value += value;
}
}
#[derive(Clone, Default)]
pub struct Timer {
pub simple: SimpleTimer,
pub histogram: Histogram,
}
impl Timer {
fn incr(&mut self, simple: SimpleTimer) {
self.simple.incr(simple.elapsed_ns());
let _ = self.histogram.increment(simple.elapsed_ns());
}
}
#[derive(Debug, Clone)]
pub struct SimpleTimer {
start: Instant,
end: Instant,
counter: Counter,
value: u64,
}
impl SimpleTimer {
fn now() -> Self {
let now = Instant::now();
SimpleTimer {
start: now.clone(),
end: now,
value: 0u64,
counter: Counter::default(),
}
}
pub fn incr(&mut self, value: u64) {
self.value += value;
self.counter.add(1isize);
}
pub fn stop(&mut self) {
self.end = Instant::now();
}
pub fn elapsed_ns(&self) -> u64 {
let duration = self.end - self.start;
(duration.as_secs() * 1_000_000_000) + u64::from(duration.subsec_nanos())
}
pub fn elapsed_us(&self) -> u64 {
let duration = self.end - self.start;
(duration.as_secs() * 1_000_000) + u64::from(duration.subsec_micros())
}
pub fn elapsed_ms(&self) -> u64 {
(self.elapsed_us() / 1000) as u64
}
pub fn value_as_ms(&self) -> u64 {
self.value / 1_000_000_000
}
}
impl Default for SimpleTimer {
fn default() -> Self {
SimpleTimer::now()
}
}