use std::collections::HashMap;
use std::fmt;
use std::ops::Deref;
use std::panic::RefUnwindSafe;
use std::sync::atomic::{AtomicIsize, Ordering};
use std::sync::Arc;
use std::sync::RwLock;
#[cfg(feature = "interval_logging")]
use std::time::Duration;
use slog::info;
pub trait StatDefinition: fmt::Debug {
fn name(&self) -> &'static str;
fn description(&self) -> &'static str;
fn stype(&self) -> StatType;
fn group_by(&self) -> Vec<&'static str>;
fn buckets(&self) -> Option<Buckets>;
}
#[macro_export]
macro_rules! define_stats {
($name:ident = {$($stat:ident($($details:tt),*)),*}) => {
pub static $name: $crate::stats::StatDefinitions = &[$(&$stat),*];
mod inner_stats {
$(
#[derive(Debug, Clone)]
#[allow(non_camel_case_types)]
pub struct $stat;
)*
}
$(
$crate::define_stats!{@single $stat, $($details),*}
)*
};
(@single $stat:ident, BucketCounter, $desc:expr, [$($tags:tt),*], ($bmethod:ident, $blabel:expr, [$($blimits:expr),*]) ) => {
$crate::define_stats!{@inner $stat, BucketCounter, $desc, $bmethod, $blabel, [$($tags),*], [$($blimits),*]}
};
(@single $stat:ident, $stype:ident, $desc:expr, [$($tags:tt),*] ) => {
$crate::define_stats!{@inner $stat, $stype, $desc, Freq, "", [$($tags),*], []}
};
(@single $stat:ident, $stype:ident, $id:expr, $desc:expr, [$($tags:tt),*] ) => {
$crate::define_stats!{@inner $stat, $stype, $desc, Freq, "", [$($tags),*], []}
};
(@inner $stat:ident, $stype:ident, $desc:expr, $bmethod:ident, $blabel:expr, [$($tags:tt),*], [$($blimits:expr),*]) => {
#[allow(non_upper_case_globals)]
static $stat : inner_stats::$stat = inner_stats::$stat;
impl $crate::stats::StatDefinition for inner_stats::$stat {
fn name(&self) -> &'static str { stringify!($stat) }
fn description(&self) -> &'static str { $desc }
fn stype(&self) -> $crate::stats::StatType { $crate::stats::StatType::$stype }
fn group_by(&self) -> Vec<&'static str> { vec![$($tags),*] }
fn buckets(&self) -> Option<$crate::stats::Buckets> {
match self.stype() {
$crate::stats::StatType::BucketCounter => {
Some($crate::stats::Buckets::new($crate::stats::BucketMethod::$bmethod,
$blabel,
&[$($blimits as i64),* ],
))
},
_ => None
}
}
}
};
}
pub struct StatDefinitionTagged {
pub defn: &'static (dyn StatDefinition + Sync),
pub fixed_tags: &'static [(&'static str, &'static str)],
}
pub trait StatTrigger {
fn stat_list(&self) -> &[StatDefinitionTagged];
fn condition(&self, _stat_id: &StatDefinitionTagged) -> bool {
false
}
fn tag_value(&self, stat_id: &StatDefinitionTagged, _tag_name: &'static str) -> String;
fn change(&self, _stat_id: &StatDefinitionTagged) -> Option<ChangeType> {
None
}
fn bucket_value(&self, _stat_id: &StatDefinitionTagged) -> Option<f64> {
None
}
}
#[derive(Debug, Clone, Eq, PartialEq, Hash)]
pub enum ChangeType {
Incr(usize),
Decr(usize),
SetTo(isize),
}
#[derive(Debug, Clone, Copy, serde::Serialize, PartialEq, PartialOrd, Eq, Ord)]
pub enum BucketLimit {
Num(i64),
Unbounded,
}
impl slog::Value for BucketLimit {
fn serialize(
&self,
_record: &::slog::Record<'_>,
key: ::slog::Key,
serializer: &mut dyn (::slog::Serializer),
) -> ::slog::Result {
match *self {
BucketLimit::Num(value) => serializer.emit_i64(key, value),
BucketLimit::Unbounded => serializer.emit_str(key, "Unbounded"),
}
} }
impl fmt::Display for BucketLimit {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
BucketLimit::Num(val) => write!(f, "{}", val),
BucketLimit::Unbounded => write!(f, "Unbounded"),
}
}
}
#[derive(Debug, Clone, serde::Serialize, PartialEq)]
pub struct Buckets {
pub method: BucketMethod,
pub label_name: &'static str,
limits: Vec<BucketLimit>,
}
impl Buckets {
pub fn new(method: BucketMethod, label_name: &'static str, limits: &[i64]) -> Buckets {
let mut limits: Vec<BucketLimit> = limits.iter().map(|f| BucketLimit::Num(*f)).collect();
limits.push(BucketLimit::Unbounded);
Buckets {
method,
label_name,
limits,
}
}
pub fn assign_buckets(&self, value: f64) -> Vec<usize> {
match self.method {
BucketMethod::CumulFreq => self
.limits
.iter()
.enumerate()
.filter(|(_, limit)| match limit {
BucketLimit::Num(b) => (value <= *b as f64),
BucketLimit::Unbounded => true,
})
.map(|(i, _)| i)
.collect(),
BucketMethod::Freq => {
let mut min_limit_index = self.limits.len() - 1;
for (i, limit) in self.limits.iter().enumerate() {
if let BucketLimit::Num(b) = limit {
if value <= *b as f64 && *limit <= self.limits[min_limit_index] {
min_limit_index = i
}
}
}
vec![min_limit_index]
}
}
}
pub fn len(&self) -> usize {
self.limits.len()
}
pub fn is_empty(&self) -> bool {
self.limits.is_empty()
}
pub fn get(&self, index: usize) -> Option<BucketLimit> {
self.limits.get(index).cloned()
}
}
#[derive(Debug, Clone, Copy, serde::Serialize, PartialEq)]
pub enum BucketMethod {
Freq,
CumulFreq,
}
#[derive(Debug, Clone, Copy, serde::Serialize, PartialEq)]
pub enum StatType {
Counter,
Gauge,
BucketCounter,
}
impl slog::Value for StatType {
fn serialize(
&self,
_record: &::slog::Record<'_>,
key: ::slog::Key,
serializer: &mut dyn (::slog::Serializer),
) -> ::slog::Result {
match *self {
StatType::Counter => serializer.emit_str(key, "counter"),
StatType::Gauge => serializer.emit_str(key, "gauge"),
StatType::BucketCounter => serializer.emit_str(key, "bucket counter"),
}
} }
#[derive(Debug, Default)]
pub struct StatsTracker {
stats: HashMap<&'static str, Stat>,
}
impl StatsTracker {
pub fn new() -> Self {
Default::default()
}
pub fn add_statistic(&mut self, defn: &'static (dyn StatDefinition + Sync + RefUnwindSafe)) {
let stat = Stat {
defn,
is_grouped: !defn.group_by().is_empty(),
group_values: RwLock::new(HashMap::new()),
stat_type_data: StatTypeData::new(defn),
value: StatValue::new(0, 1),
};
self.stats.insert(defn.name(), stat);
}
fn update_stats(&self, log: &dyn StatTrigger) {
for stat_def in log.stat_list() {
if log.condition(stat_def) {
let stat = &self.stats.get(stat_def.defn.name()).unwrap_or_else(|| {
panic!(
"No statistic found with name {}, did you try writing a log through a
logger which wasn't initialized with your stats definitions?",
stat_def.defn.name()
)
});
stat.update(stat_def, log)
}
}
}
pub fn log_all<T: StatisticsLogFormatter>(&self, logger: &StatisticsLogger) {
for stat in self.stats.values() {
let outputs = stat.get_tagged_vals();
for (tag_values, val) in outputs {
let tags = stat.get_tag_pairs(&tag_values);
T::log_stat(
logger,
&StatLogData {
stype: stat.defn.stype(),
name: stat.defn.name(),
description: stat.defn.description(),
value: val,
tags,
},
); }
}
}
pub fn get_stats(&self) -> Vec<StatSnapshot> {
self.stats
.values()
.map(|stat| stat.get_snapshot())
.collect::<Vec<_>>()
}
}
pub const DEFAULT_LOG_INTERVAL_SECS: u64 = 300;
pub type StatDefinitions = &'static [&'static (dyn StatDefinition + Sync + RefUnwindSafe)];
#[derive(Debug, Default)]
pub struct StatsLoggerBuilder {
pub stats: Vec<StatDefinitions>,
}
impl StatsLoggerBuilder {
pub fn with_stats(mut self, stats: Vec<StatDefinitions>) -> Self {
self.stats = stats;
self
}
pub fn fuse(self, logger: slog::Logger) -> StatisticsLogger {
let mut tracker = StatsTracker::new();
for set in self.stats {
for s in set {
tracker.add_statistic(*s)
}
}
StatisticsLogger {
logger,
tracker: Arc::new(tracker),
}
}
#[cfg(feature = "interval_logging")]
pub fn fuse_with_log_interval<T: StatisticsLogFormatter>(
self,
interval_secs: u64,
logger: slog::Logger,
) -> StatisticsLogger {
let stats_logger = self.fuse(logger);
let timer_full_logger = stats_logger.clone();
tokio::spawn(async move {
let mut interval = tokio::time::interval(Duration::from_secs(interval_secs));
interval.tick().await;
loop {
interval.tick().await;
timer_full_logger.tracker.log_all::<T>(&timer_full_logger);
}
});
stats_logger
}
}
#[derive(Debug)]
pub struct StatLogData<'a> {
pub description: &'static str,
pub stype: StatType,
pub name: &'static str,
pub value: f64,
pub tags: Vec<(&'static str, &'a str)>,
}
#[derive(Debug, Clone)]
pub struct DefaultStatisticsLogFormatter;
pub static DEFAULT_LOG_ID: &str = "STATS-1";
impl StatisticsLogFormatter for DefaultStatisticsLogFormatter {
fn log_stat(logger: &StatisticsLogger, stat: &StatLogData<'_>)
where
Self: Sized,
{
info!(logger, "New statistic value";
"log_id" => DEFAULT_LOG_ID,
"name" => stat.name,
"metric_type" => stat.stype,
"description" => stat.description,
"value" => stat.value,
"tags" => stat.tags.iter().
map(|x| format!("{}={}", x.0, x.1)).collect::<Vec<_>>().join(","))
}
}
pub trait StatisticsLogFormatter: Sync + Send + 'static {
fn log_stat(logger: &StatisticsLogger, stat: &StatLogData<'_>)
where
Self: Sized;
}
#[derive(Debug, Clone)]
pub struct StatisticsLogger {
logger: slog::Logger,
tracker: Arc<StatsTracker>,
}
impl Deref for StatisticsLogger {
type Target = slog::Logger;
fn deref(&self) -> &Self::Target {
&self.logger
}
}
impl StatisticsLogger {
pub fn with_params<P>(&self, params: slog::OwnedKV<P>) -> Self
where
P: slog::SendSyncRefUnwindSafeKV + 'static,
{
StatisticsLogger {
logger: self.logger.new(params),
tracker: self.tracker.clone(),
} }
pub fn update_stats(&self, log: &dyn StatTrigger) {
self.tracker.update_stats(log)
}
pub fn set_slog_logger(&mut self, logger: slog::Logger) {
self.logger = logger;
}
pub fn get_stats(&self) -> Vec<StatSnapshot> {
self.tracker.get_stats()
}
}
#[derive(Debug)]
pub enum StatSnapshotValues {
Counter(Vec<StatSnapshotValue>),
Gauge(Vec<StatSnapshotValue>),
BucketCounter(Buckets, Vec<(StatSnapshotValue, BucketLimit)>),
}
impl StatSnapshotValues {
pub fn is_empty(&self) -> bool {
match self {
StatSnapshotValues::Counter(ref vals) | StatSnapshotValues::Gauge(ref vals) => {
vals.is_empty()
}
StatSnapshotValues::BucketCounter(_, ref vals) => vals.is_empty(),
}
}
}
#[derive(Debug)]
pub struct StatSnapshot {
pub definition: &'static dyn StatDefinition,
pub values: StatSnapshotValues,
}
impl StatSnapshot {
pub fn new(definition: &'static dyn StatDefinition, values: StatSnapshotValues) -> Self {
StatSnapshot { definition, values }
}
}
#[derive(Debug)]
pub struct StatSnapshotValue {
pub group_values: Vec<String>,
pub value: f64,
}
impl StatSnapshotValue {
pub fn new(group_values: Vec<String>, value: f64) -> Self {
StatSnapshotValue {
group_values,
value,
}
}
}
#[derive(Debug)]
enum StatTypeData {
Counter,
Gauge,
BucketCounter(BucketCounterData),
}
impl StatTypeData {
fn new(defn: &'static dyn StatDefinition) -> Self {
match defn.stype() {
StatType::Counter => StatTypeData::Counter,
StatType::Gauge => StatTypeData::Gauge,
StatType::BucketCounter => {
let is_grouped = !defn.group_by().is_empty();
StatTypeData::BucketCounter(BucketCounterData::new(
defn.buckets().expect(
"Stat definition with type BucketCounter did not contain bucket info",
),
is_grouped,
))
}
}
}
fn update(&self, defn: &StatDefinitionTagged, trigger: &dyn StatTrigger) {
if let StatTypeData::BucketCounter(ref bucket_counter_data) = self {
bucket_counter_data.update(defn, trigger);
}
}
fn get_tag_pairs<'a, 'b, 'c>(
&'a self,
tag_values: &'b str,
defn: &'c dyn StatDefinition,
) -> Option<Vec<(&'static str, &'b str)>> {
if let StatTypeData::BucketCounter(ref bucket_counter_data) = self {
Some(bucket_counter_data.get_tag_pairs(tag_values, defn))
} else {
None
}
}
fn get_tagged_vals(&self) -> Option<Vec<(String, f64)>> {
if let StatTypeData::BucketCounter(ref bucket_counter_data) = self {
Some(bucket_counter_data.get_tagged_vals())
} else {
None
}
}
}
#[derive(Debug)]
struct BucketCounterData {
buckets: Buckets,
bucket_values: Vec<StatValue>,
bucket_group_values: RwLock<HashMap<String, Vec<StatValue>>>,
is_grouped: bool,
}
impl BucketCounterData {
fn new(buckets: Buckets, is_grouped: bool) -> Self {
let buckets_len = buckets.len();
let mut bucket_values = Vec::new();
bucket_values.reserve_exact(buckets_len);
for _ in 0..buckets_len {
bucket_values.push(StatValue::new(0, 1));
}
BucketCounterData {
buckets,
bucket_values,
bucket_group_values: RwLock::new(HashMap::new()),
is_grouped,
}
}
fn update(&self, defn: &StatDefinitionTagged, trigger: &dyn StatTrigger) {
let bucket_value = trigger.bucket_value(defn).expect("Bad log definition");
let buckets_to_update = self.buckets.assign_buckets(bucket_value);
for index in &buckets_to_update {
self.bucket_values
.get(*index)
.expect("Invalid bucket index")
.update(&trigger.change(defn).expect("Bad log definition"));
}
if self.is_grouped {
self.update_grouped(defn, trigger, &buckets_to_update);
}
}
fn update_grouped(
&self,
defn: &StatDefinitionTagged,
trigger: &dyn StatTrigger,
buckets_to_update: &[usize],
) {
let change = trigger.change(defn).expect("Bad log definition");
let tag_values = defn
.defn
.group_by()
.iter()
.map(|n| trigger.tag_value(defn, n))
.collect::<Vec<String>>()
.join(",");
let found_values = {
let inner_vals = self.bucket_group_values.read().expect("Poisoned lock");
if let Some(tagged_bucket_vals) = inner_vals.get(&tag_values) {
update_bucket_values(tagged_bucket_vals, buckets_to_update, &change);
true
} else {
false
}
};
if !found_values {
let mut new_bucket_vals = Vec::new();
let bucket_len = self.buckets.len();
new_bucket_vals.reserve_exact(bucket_len);
for _ in 0..bucket_len {
new_bucket_vals.push(StatValue::new(0, 1));
}
let mut inner_vals = self.bucket_group_values.write().expect("Poisoned lock");
let vals = inner_vals
.entry(tag_values)
.or_insert_with(|| new_bucket_vals);
update_bucket_values(vals, buckets_to_update, &change);
}
}
fn get_tag_pairs<'a, 'b, 'c>(
&'a self,
tag_values: &'b str,
defn: &'c dyn StatDefinition,
) -> Vec<(&'static str, &'b str)> {
let mut tag_names = defn.group_by();
tag_names.push(self.buckets.label_name);
tag_names
.iter()
.cloned()
.zip(tag_values.split(','))
.collect::<Vec<_>>()
}
fn get_tagged_vals(&self) -> Vec<(String, f64)> {
if self.is_grouped {
let mut tag_bucket_vals = Vec::new();
{
let inner_vals = self.bucket_group_values.read().expect("Poisoned lock");
for (group_values_str, bucket_values) in inner_vals.iter() {
for (index, val) in bucket_values.iter().enumerate() {
tag_bucket_vals.push((group_values_str.to_string(), index, val.as_float()));
}
}
}
tag_bucket_vals
.into_iter()
.map(|(mut tag_values, index, val)| {
let bucket = self.buckets.get(index).expect("Invalid bucket index");
tag_values.push_str(&format!(",{}", bucket));
(tag_values, val)
})
.collect()
} else {
self.bucket_values
.iter()
.enumerate()
.map(|(index, val)| {
let bucket = self.buckets.get(index).expect("Invalid bucket index");
(bucket.to_string(), val.as_float())
})
.collect()
}
}
fn get_snapshot_values(&self) -> Vec<(StatSnapshotValue, BucketLimit)> {
if self.is_grouped {
let mut tag_bucket_vals = Vec::new();
{
let inner_vals = self.bucket_group_values.read().expect("Poisoned lock");
for (group_values_str, bucket_values) in inner_vals.iter() {
for (index, val) in bucket_values.iter().enumerate() {
tag_bucket_vals.push((group_values_str.to_string(), index, val.as_float()));
}
}
}
tag_bucket_vals
.into_iter()
.map(|(tag_values, index, val)| {
let bucket = self.buckets.get(index).expect("Invalid bucket index");
let group_values = tag_values
.split(',')
.map(|group| group.to_string())
.collect::<Vec<_>>();
(StatSnapshotValue::new(group_values, val), bucket)
})
.collect()
} else {
self.bucket_values
.iter()
.enumerate()
.map(|(index, val)| {
let bucket = self.buckets.get(index).expect("Invalid bucket index");
(StatSnapshotValue::new(vec![], val.as_float()), bucket)
})
.collect()
}
}
}
#[derive(Debug)]
struct Stat {
defn: &'static (dyn StatDefinition + Sync + RefUnwindSafe),
value: StatValue,
is_grouped: bool,
group_values: RwLock<HashMap<String, StatValue>>,
stat_type_data: StatTypeData,
}
impl Stat {
fn get_tag_pairs<'a, 'b>(&'a self, tag_values: &'b str) -> Vec<(&'static str, &'b str)> {
self.stat_type_data
.get_tag_pairs(tag_values, self.defn)
.unwrap_or_else(|| {
self.defn
.group_by()
.iter()
.cloned()
.zip(tag_values.split(','))
.collect::<Vec<_>>()
})
}
fn get_tagged_vals(&self) -> Vec<(String, f64)> {
self.stat_type_data.get_tagged_vals().unwrap_or_else(|| {
if self.is_grouped {
let inner_vals = self.group_values.read().expect("Poisoned lock");
inner_vals
.iter()
.map(|(group_values_str, value)| {
(group_values_str.to_string(), value.as_float())
})
.collect()
} else {
vec![("".to_string(), self.value.as_float())]
}
})
}
fn update(&self, defn: &StatDefinitionTagged, trigger: &dyn StatTrigger) {
self.value
.update(&trigger.change(defn).expect("Bad log definition"));
if self.is_grouped {
self.update_grouped(defn, trigger)
}
self.stat_type_data.update(defn, trigger);
}
fn update_grouped(&self, defn: &StatDefinitionTagged, trigger: &dyn StatTrigger) {
let change = trigger.change(defn).expect("Bad log definition");
let tag_values = self
.defn
.group_by()
.iter()
.map(|n| trigger.tag_value(defn, n))
.collect::<Vec<String>>()
.join(",");
let found_values = {
let inner_vals = self.group_values.read().expect("Poisoned lock");
if let Some(val) = inner_vals.get(&tag_values) {
val.update(&change);
true
} else {
false
}
};
if !found_values {
let mut inner_vals = self.group_values.write().expect("Poisoned lock");
let val = inner_vals
.entry(tag_values)
.or_insert_with(|| StatValue::new(0, 1));
val.update(&change);
}
}
fn get_snapshot(&self) -> StatSnapshot {
let stat_snapshot_values = match self.stat_type_data {
StatTypeData::BucketCounter(ref bucket_counter_data) => {
StatSnapshotValues::BucketCounter(
bucket_counter_data.buckets.clone(),
bucket_counter_data.get_snapshot_values(),
)
}
StatTypeData::Counter => StatSnapshotValues::Counter(self.get_snapshot_values()),
StatTypeData::Gauge => StatSnapshotValues::Gauge(self.get_snapshot_values()),
};
StatSnapshot::new(self.defn, stat_snapshot_values)
}
fn get_snapshot_values(&self) -> Vec<StatSnapshotValue> {
self.get_tagged_vals()
.iter()
.map(|(group_values_str, value)| {
let group_values = if !group_values_str.is_empty() {
group_values_str
.split(',')
.map(|group| group.to_string())
.collect::<Vec<_>>()
} else {
vec![]
};
StatSnapshotValue::new(group_values, *value)
})
.collect()
}
}
#[derive(Debug)]
struct StatValue {
num: AtomicIsize,
divisor: u64,
}
impl StatValue {
fn new(num: isize, divisor: u64) -> Self {
StatValue {
num: AtomicIsize::new(num),
divisor,
}
}
fn update(&self, change: &ChangeType) -> bool {
match *change {
ChangeType::Incr(i) => {
self.num.fetch_add(i as isize, Ordering::Relaxed);
true
}
ChangeType::Decr(d) => {
self.num.fetch_sub(d as isize, Ordering::Relaxed);
true
}
ChangeType::SetTo(v) => self.num.swap(v, Ordering::Relaxed) != v,
}
}
fn as_float(&self) -> f64 {
(self.num.load(Ordering::Relaxed) as f64) / (self.divisor as isize as f64)
}
}
fn update_bucket_values(
bucket_values: &[StatValue],
buckets_to_update: &[usize],
change: &ChangeType,
) {
for index in buckets_to_update.iter() {
bucket_values
.get(*index)
.expect("Invalid bucket index")
.update(change);
}
}
#[cfg(test)]
mod tests {
use super::*;
#[allow(dead_code)]
struct DummyNonCloneFormatter;
impl StatisticsLogFormatter for DummyNonCloneFormatter {
fn log_stat(_logger: &StatisticsLogger, _stat: &StatLogData<'_>)
where
Self: Sized,
{
}
}
#[test]
fn check_clone() {
let builder = StatsLoggerBuilder::default();
let logger = builder.fuse(slog::Logger::root(slog::Discard, slog::o!()));
fn is_clone<T: Clone>(_: &T) {}
is_clone(&logger);
}
}