use std::str::FromStr;
use chrono::{DateTime, Duration, Utc};
use eyre::{eyre, ErrReport};
use nom::{
branch::alt,
bytes::complete::take_while_m_n,
character::complete::char,
combinator::value,
number::complete::double,
sequence::{separated_pair, terminated},
Finish,
{bytes::complete::tag, IResult},
};
use serde::{Deserialize, Serialize};
use super::{MetricStringKey, MetricTimestamp};
use crate::util::serialization::float_to_duration;
#[derive(Clone, Debug, Serialize, Deserialize)]
pub enum MetricReading {
TimeWeightedAverage {
value: f64,
timestamp: MetricTimestamp,
#[serde(with = "float_to_duration")]
interval: Duration,
},
Counter {
value: f64,
timestamp: MetricTimestamp,
},
Gauge {
value: f64,
timestamp: MetricTimestamp,
},
Histogram {
value: f64,
timestamp: MetricTimestamp,
},
ReportTag {
value: String,
timestamp: MetricTimestamp,
},
Rssi {
value: f64,
timestamp: MetricTimestamp,
},
Bool {
value: bool,
timestamp: MetricTimestamp,
},
}
impl MetricReading {
fn parse(input: &str) -> IResult<&str, MetricReading> {
let (remaining, (value, statsd_type)) =
separated_pair(double, tag("|"), StatsDMetricType::parse)(input)?;
let timestamp = Utc::now();
match statsd_type {
StatsDMetricType::Histogram => {
Ok((remaining, MetricReading::Histogram { value, timestamp }))
}
StatsDMetricType::Counter => {
Ok((remaining, MetricReading::Counter { value, timestamp }))
}
StatsDMetricType::Timer => Ok((remaining, MetricReading::Counter { value, timestamp })),
StatsDMetricType::Gauge => Ok((remaining, MetricReading::Gauge { value, timestamp })),
}
}
pub fn timestamp(&self) -> DateTime<Utc> {
match self {
Self::Counter { timestamp, .. } => *timestamp,
Self::Histogram { timestamp, .. } => *timestamp,
Self::Gauge { timestamp, .. } => *timestamp,
Self::ReportTag { timestamp, .. } => *timestamp,
Self::TimeWeightedAverage { timestamp, .. } => *timestamp,
Self::Rssi { timestamp, .. } => *timestamp,
Self::Bool { timestamp, .. } => *timestamp,
}
}
}
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct KeyedMetricReading {
pub name: MetricStringKey,
pub value: MetricReading,
}
impl KeyedMetricReading {
pub fn new(name: MetricStringKey, value: MetricReading) -> Self {
Self { name, value }
}
pub fn new_gauge(name: MetricStringKey, value: f64) -> Self {
Self {
name,
value: MetricReading::Gauge {
value,
timestamp: Utc::now(),
},
}
}
pub fn new_histogram(name: MetricStringKey, value: f64) -> Self {
Self {
name,
value: MetricReading::Histogram {
value,
timestamp: Utc::now(),
},
}
}
pub fn new_counter(name: MetricStringKey, value: f64) -> Self {
Self {
name,
value: MetricReading::Counter {
value,
timestamp: Utc::now(),
},
}
}
pub fn new_report_tag(name: MetricStringKey, value: String) -> Self {
Self {
name,
value: MetricReading::ReportTag {
value,
timestamp: Utc::now(),
},
}
}
pub fn from_statsd_str(s: &str, legacy_key: bool) -> Result<Self, ErrReport> {
match Self::parse_statsd(s, legacy_key).finish() {
Ok((_, (string_key, reading))) => {
let metric_key = MetricStringKey::from_str(&string_key)
.map_err(|e| eyre!("Invalid metric key: {}", e))?;
Ok(KeyedMetricReading::new(metric_key, reading))
}
Err(e) => Err(eyre!(
"Failed to parse string \"{}\" as a StatsD metric reading: {}",
s,
e
)),
}
}
pub fn increment_counter(name: MetricStringKey) -> Self {
Self {
name,
value: MetricReading::Counter {
value: 1.0,
timestamp: Utc::now(),
},
}
}
pub fn add_to_counter(name: MetricStringKey, value: f64) -> Self {
Self {
name,
value: MetricReading::Counter {
value,
timestamp: Utc::now(),
},
}
}
fn parse_statsd(input: &str, use_legacy_keys: bool) -> IResult<&str, (String, MetricReading)> {
let is_valid_ascii = |c: char| c.is_ascii() && c != ':';
let (value_str, name) =
terminated(take_while_m_n(0, 128, is_valid_ascii), char(':'))(input)?;
let (remaining, value) = MetricReading::parse(value_str)?;
let prefix = match &value {
MetricReading::Counter { .. } => "statsd/count",
_ => "statsd/gauge",
};
let key_string = if use_legacy_keys {
format!("{}/{}", prefix, name)
} else {
name.to_string()
};
Ok((remaining, (key_string, value)))
}
fn from_arg_str(s: &str) -> Result<Self, ErrReport> {
let (key, value_str) = s.split_once('=').ok_or(eyre!(
"Attached metric reading should be specified as KEY=VALUE"
))?;
let metric_key = MetricStringKey::from_str(key).map_err(|e| eyre!(e))?;
if let Ok(value) = f64::from_str(value_str) {
let reading = MetricReading::Gauge {
value,
timestamp: Utc::now(),
};
Ok(KeyedMetricReading::new(metric_key, reading))
} else {
let reading = MetricReading::ReportTag {
value: value_str.to_string(),
timestamp: Utc::now(),
};
Ok(KeyedMetricReading::new(metric_key, reading))
}
}
}
impl FromStr for KeyedMetricReading {
type Err = ErrReport;
fn from_str(s: &str) -> Result<Self, Self::Err> {
match Self::from_statsd_str(s, false) {
Ok(reading) => Ok(reading),
Err(_) => match Self::from_arg_str(s) {
Ok(reading) => Ok(reading),
Err(e) => Err(eyre!("Couldn't parse \"{}\" as a Gauge metric: {}", s, e)),
},
}
}
}
#[derive(Debug, Clone)]
enum StatsDMetricType {
Counter,
Histogram,
Gauge,
Timer,
}
impl StatsDMetricType {
fn parse(input: &str) -> IResult<&str, StatsDMetricType> {
alt((
value(StatsDMetricType::Counter, char('c')),
value(StatsDMetricType::Histogram, char('h')),
value(StatsDMetricType::Gauge, char('g')),
value(StatsDMetricType::Timer, tag("ms")),
))(input)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::test_utils::setup_logger;
use rstest::rstest;
#[rstest]
#[case("testCounter=1")]
#[case("hello=world")]
#[case("float=100.0")]
fn parse_valid_arg_reading(#[case] reading_str: &str, _setup_logger: ()) {
assert!(KeyedMetricReading::from_str(reading_str).is_ok())
}
#[rstest]
#[case("testCounter:1|c")]
#[case("test_counter:1.0|c")]
#[case("test_histo:100|h")]
#[case("test_gauge:1.7|g")]
#[case("cpu3_idle:100.9898|g")]
#[case("some_negative_gauge:-87.55|g")]
#[case("test_timer:3600000|ms")]
#[case("test Counter:1|c")]
#[case("{test_counter:1.0|c}")]
#[case("\"test_counter\":1.0|c}")]
fn parse_valid_statsd_reading(#[case] reading_str: &str, _setup_logger: ()) {
assert!(KeyedMetricReading::from_str(reading_str).is_ok())
}
#[rstest]
#[case("test_gauge:\"string-value\"|g")]
#[case(":\"string-value\"|g")]
#[case("test_gauge:string-value|g")]
fn fail_on_invalid_statsd_reading(#[case] reading_str: &str, _setup_logger: ()) {
assert!(KeyedMetricReading::from_str(reading_str).is_err())
}
#[rstest]
#[case("testCounter:1|c", "statsd/count/testCounter")]
#[case("testgauge:1.7|g", "statsd/gauge/testgauge")]
#[case("test_histo:100|h", "statsd/gauge/test_histo")]
#[case("test_timer:3600000|ms", "statsd/count/test_timer")]
fn parse_statsd_key_with_legacy(#[case] reading_str: &str, #[case] expected_str: &str) {
let reading = KeyedMetricReading::from_statsd_str(reading_str, true).unwrap();
assert_eq!(
reading.name,
MetricStringKey::from_str(expected_str).unwrap()
);
}
}