use std::borrow::Cow;
use std::collections::hash_map::{DefaultHasher, Entry};
use std::collections::{BTreeMap, BTreeSet, HashMap};
use std::fmt::{self, Write};
use std::sync::{Arc, Mutex};
use std::thread::{self, JoinHandle};
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use sentry_types::protocol::latest::{Envelope, EnvelopeItem};
use crate::client::TransportArc;
use crate::{ClientOptions, Hub};
pub use crate::units::*;
const BUCKET_INTERVAL: Duration = Duration::from_secs(10);
const FLUSH_INTERVAL: Duration = Duration::from_secs(5);
const MAX_WEIGHT: usize = 100_000;
pub type MetricStr = Cow<'static, str>;
pub type CounterValue = f64;
pub type DistributionValue = f64;
pub type SetValue = u32;
pub type GaugeValue = f64;
#[derive(Debug, Clone, Copy, PartialEq)]
pub enum MetricValue {
Counter(CounterValue),
Distribution(DistributionValue),
Set(SetValue),
Gauge(GaugeValue),
}
impl MetricValue {
pub fn set_from_str(string: &str) -> Self {
Self::Set(hash_set_value(string))
}
pub fn set_from_display(display: impl fmt::Display) -> Self {
Self::Set(hash_set_value(&display.to_string()))
}
fn ty(&self) -> MetricType {
match self {
Self::Counter(_) => MetricType::Counter,
Self::Distribution(_) => MetricType::Distribution,
Self::Gauge(_) => MetricType::Gauge,
Self::Set(_) => MetricType::Set,
}
}
}
fn hash_set_value(string: &str) -> u32 {
use std::hash::Hasher;
let mut hasher = DefaultHasher::default();
hasher.write(string.as_bytes());
hasher.finish() as u32
}
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord)]
enum MetricType {
Counter,
Distribution,
Set,
Gauge,
}
impl MetricType {
pub fn as_str(&self) -> &'static str {
match self {
MetricType::Counter => "c",
MetricType::Distribution => "d",
MetricType::Set => "s",
MetricType::Gauge => "g",
}
}
}
impl fmt::Display for MetricType {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str(self.as_str())
}
}
impl std::str::FromStr for MetricType {
type Err = ();
fn from_str(s: &str) -> Result<Self, Self::Err> {
Ok(match s {
"c" | "m" => Self::Counter,
"h" | "d" | "ms" => Self::Distribution,
"s" => Self::Set,
"g" => Self::Gauge,
_ => return Err(()),
})
}
}
#[derive(Clone, Copy, Debug, PartialEq)]
struct GaugeSummary {
pub last: GaugeValue,
pub min: GaugeValue,
pub max: GaugeValue,
pub sum: GaugeValue,
pub count: u64,
}
impl GaugeSummary {
pub fn single(value: GaugeValue) -> Self {
Self {
last: value,
min: value,
max: value,
sum: value,
count: 1,
}
}
pub fn insert(&mut self, value: GaugeValue) {
self.last = value;
self.min = self.min.min(value);
self.max = self.max.max(value);
self.sum += value;
self.count += 1;
}
}
#[derive(Debug)]
enum BucketValue {
Counter(CounterValue),
Distribution(Vec<DistributionValue>),
Set(BTreeSet<SetValue>),
Gauge(GaugeSummary),
}
impl BucketValue {
pub fn insert(&mut self, value: MetricValue) -> usize {
match (self, value) {
(Self::Counter(c1), MetricValue::Counter(c2)) => {
*c1 += c2;
0
}
(Self::Distribution(d1), MetricValue::Distribution(d2)) => {
d1.push(d2);
1
}
(Self::Set(s1), MetricValue::Set(s2)) => {
if s1.insert(s2) {
1
} else {
0
}
}
(Self::Gauge(g1), MetricValue::Gauge(g2)) => {
g1.insert(g2);
0
}
_ => panic!("invalid metric type"),
}
}
pub fn weight(&self) -> usize {
match self {
BucketValue::Counter(_) => 1,
BucketValue::Distribution(v) => v.len(),
BucketValue::Set(v) => v.len(),
BucketValue::Gauge(_) => 5,
}
}
}
impl From<MetricValue> for BucketValue {
fn from(value: MetricValue) -> Self {
match value {
MetricValue::Counter(v) => Self::Counter(v),
MetricValue::Distribution(v) => Self::Distribution(vec![v]),
MetricValue::Gauge(v) => Self::Gauge(GaugeSummary::single(v)),
MetricValue::Set(v) => Self::Set(BTreeSet::from([v])),
}
}
}
#[derive(Debug)]
pub struct Metric {
name: MetricStr,
unit: MetricUnit,
value: MetricValue,
tags: BTreeMap<MetricStr, MetricStr>,
time: Option<SystemTime>,
}
impl Metric {
pub fn build(name: impl Into<MetricStr>, value: MetricValue) -> MetricBuilder {
let metric = Metric {
name: name.into(),
unit: MetricUnit::None,
value,
tags: BTreeMap::new(),
time: None,
};
MetricBuilder { metric }
}
pub fn parse_statsd(string: &str) -> Result<Self, ParseMetricError> {
parse_metric_opt(string).ok_or(ParseMetricError(()))
}
pub fn incr(name: impl Into<MetricStr>, value: f64) -> MetricBuilder {
Self::build(name, MetricValue::Counter(value))
}
pub fn count(name: impl Into<MetricStr>) -> MetricBuilder {
Self::build(name, MetricValue::Counter(1.0))
}
pub fn timing(name: impl Into<MetricStr>, timing: Duration) -> MetricBuilder {
Self::build(name, MetricValue::Distribution(timing.as_secs_f64()))
.with_unit(DurationUnit::Second)
}
pub fn distribution(name: impl Into<MetricStr>, value: f64) -> MetricBuilder {
Self::build(name, MetricValue::Distribution(value))
}
pub fn set(name: impl Into<MetricStr>, string: &str) -> MetricBuilder {
Self::build(name, MetricValue::set_from_str(string))
}
pub fn gauge(name: impl Into<MetricStr>, value: f64) -> MetricBuilder {
Self::build(name, MetricValue::Gauge(value))
}
pub fn send(self) {
if let Some(client) = Hub::current().client() {
client.add_metric(self);
}
}
}
#[must_use]
#[derive(Debug)]
pub struct MetricBuilder {
metric: Metric,
}
impl MetricBuilder {
pub fn with_unit(mut self, unit: impl Into<MetricUnit>) -> Self {
self.metric.unit = unit.into();
self
}
pub fn with_tag(mut self, name: impl Into<MetricStr>, value: impl Into<MetricStr>) -> Self {
self.metric.tags.insert(name.into(), value.into());
self
}
pub fn with_time(mut self, time: SystemTime) -> Self {
self.metric.time = Some(time);
self
}
pub fn finish(self) -> Metric {
self.metric
}
pub fn send(self) {
self.finish().send()
}
}
#[derive(Debug)]
pub struct ParseMetricError(());
impl std::error::Error for ParseMetricError {}
impl fmt::Display for ParseMetricError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str("invalid metric string")
}
}
fn parse_metric_opt(string: &str) -> Option<Metric> {
let mut components = string.split('|');
let (mri_str, value_str) = components.next()?.split_once(':')?;
let (name, unit) = match mri_str.split_once('@') {
Some((name, unit_str)) => (name, unit_str.parse().ok()?),
None => (mri_str, MetricUnit::None),
};
let ty = components.next().and_then(|s| s.parse().ok())?;
let value = match ty {
MetricType::Counter => MetricValue::Counter(value_str.parse().ok()?),
MetricType::Distribution => MetricValue::Distribution(value_str.parse().ok()?),
MetricType::Set => MetricValue::Set(value_str.parse().ok()?),
MetricType::Gauge => {
let value_str = value_str.split(':').next().unwrap();
MetricValue::Gauge(value_str.parse().ok()?)
}
};
let mut builder = Metric::build(name.to_owned(), value).with_unit(unit);
for component in components {
if let Some('#') = component.chars().next() {
for pair in component.get(1..)?.split(',') {
let mut key_value = pair.splitn(2, ':');
let key = key_value.next()?.to_owned();
let value = key_value.next().unwrap_or_default().to_owned();
builder = builder.with_tag(key, value);
}
}
}
Some(builder.finish())
}
#[derive(Debug, PartialEq, Eq, Hash)]
struct BucketKey {
ty: MetricType,
name: MetricStr,
unit: MetricUnit,
tags: BTreeMap<MetricStr, MetricStr>,
}
type Timestamp = u64;
type BucketMap = BTreeMap<Timestamp, HashMap<BucketKey, BucketValue>>;
#[derive(Debug)]
struct SharedAggregatorState {
buckets: BucketMap,
weight: usize,
running: bool,
force_flush: bool,
}
impl SharedAggregatorState {
pub fn new() -> Self {
Self {
buckets: BTreeMap::new(),
weight: 0,
running: true,
force_flush: false,
}
}
pub fn add(&mut self, mut timestamp: Timestamp, key: BucketKey, value: MetricValue) {
timestamp /= BUCKET_INTERVAL.as_secs();
timestamp *= BUCKET_INTERVAL.as_secs();
match self.buckets.entry(timestamp).or_default().entry(key) {
Entry::Occupied(mut e) => self.weight += e.get_mut().insert(value),
Entry::Vacant(e) => self.weight += e.insert(value.into()).weight(),
}
}
pub fn take_buckets(&mut self) -> BucketMap {
if self.force_flush || !self.running {
self.weight = 0;
self.force_flush = false;
std::mem::take(&mut self.buckets)
} else {
let timestamp = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.saturating_sub(BUCKET_INTERVAL)
.as_secs();
let mut split = self.buckets.split_off(×tamp);
std::mem::swap(&mut split, &mut self.buckets);
self.weight -= split
.values()
.flat_map(|map| map.values())
.map(|bucket| bucket.weight())
.sum::<usize>();
split
}
}
pub fn weight(&self) -> usize {
self.weight
}
}
type TagMap = BTreeMap<MetricStr, MetricStr>;
fn get_default_tags(options: &ClientOptions) -> TagMap {
let mut tags = TagMap::new();
if let Some(ref release) = options.release {
tags.insert("release".into(), release.clone());
}
if let Some(ref environment) = options.environment {
tags.insert("environment".into(), environment.clone());
}
tags
}
#[derive(Clone)]
struct Worker {
shared: Arc<Mutex<SharedAggregatorState>>,
default_tags: TagMap,
transport: TransportArc,
}
impl Worker {
pub fn run(self) {
loop {
thread::park_timeout(FLUSH_INTERVAL);
let buckets = {
let mut guard = self.shared.lock().unwrap();
if !guard.running {
break;
}
guard.take_buckets()
};
self.flush_buckets(buckets);
}
}
pub fn flush_buckets(&self, buckets: BucketMap) {
if buckets.is_empty() {
return;
}
if let Ok(output) = self.format_payload(buckets) {
let mut envelope = Envelope::new();
envelope.add_item(EnvelopeItem::Statsd(output));
if let Some(ref transport) = *self.transport.read().unwrap() {
transport.send_envelope(envelope);
}
}
}
fn format_payload(&self, buckets: BucketMap) -> std::io::Result<Vec<u8>> {
use std::io::Write;
let mut out = vec![];
for (timestamp, buckets) in buckets {
for (key, value) in buckets {
write!(&mut out, "{}", SafeKey(key.name.as_ref()))?;
if key.unit != MetricUnit::None {
write!(&mut out, "@{}", key.unit)?;
}
match value {
BucketValue::Counter(c) => {
write!(&mut out, ":{}", c)?;
}
BucketValue::Distribution(d) => {
for v in d {
write!(&mut out, ":{}", v)?;
}
}
BucketValue::Set(s) => {
for v in s {
write!(&mut out, ":{}", v)?;
}
}
BucketValue::Gauge(g) => {
write!(
&mut out,
":{}:{}:{}:{}:{}",
g.last, g.min, g.max, g.sum, g.count
)?;
}
}
write!(&mut out, "|{}", key.ty.as_str())?;
for (i, (k, v)) in key.tags.iter().chain(&self.default_tags).enumerate() {
match i {
0 => write!(&mut out, "|#")?,
_ => write!(&mut out, ",")?,
}
write!(&mut out, "{}:{}", SafeKey(k.as_ref()), SafeVal(v.as_ref()))?;
}
writeln!(&mut out, "|T{}", timestamp)?;
}
}
Ok(out)
}
}
impl fmt::Debug for Worker {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Worker")
.field("transport", &format_args!("ArcTransport"))
.field("default_tags", &self.default_tags)
.finish()
}
}
#[derive(Debug)]
pub(crate) struct MetricAggregator {
local_worker: Worker,
handle: Option<JoinHandle<()>>,
}
impl MetricAggregator {
pub fn new(transport: TransportArc, options: &ClientOptions) -> Self {
let worker = Worker {
shared: Arc::new(Mutex::new(SharedAggregatorState::new())),
default_tags: get_default_tags(options),
transport,
};
let local_worker = worker.clone();
let handle = thread::Builder::new()
.name("sentry-metrics".into())
.spawn(move || worker.run())
.expect("failed to spawn thread");
Self {
local_worker,
handle: Some(handle),
}
}
pub fn add(&self, metric: Metric) {
let Metric {
name,
unit,
value,
tags,
time,
} = metric;
let timestamp = time
.unwrap_or_else(SystemTime::now)
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_secs();
let key = BucketKey {
ty: value.ty(),
name,
unit,
tags,
};
let mut guard = self.local_worker.shared.lock().unwrap();
guard.add(timestamp, key, value);
if guard.weight() > MAX_WEIGHT {
if let Some(ref handle) = self.handle {
guard.force_flush = true;
handle.thread().unpark();
}
}
}
pub fn flush(&self) {
let buckets = {
let mut guard = self.local_worker.shared.lock().unwrap();
guard.force_flush = true;
guard.take_buckets()
};
self.local_worker.flush_buckets(buckets);
}
}
impl Drop for MetricAggregator {
fn drop(&mut self) {
let buckets = {
let mut guard = self.local_worker.shared.lock().unwrap();
guard.running = false;
guard.take_buckets()
};
self.local_worker.flush_buckets(buckets);
if let Some(handle) = self.handle.take() {
handle.thread().unpark();
handle.join().unwrap();
}
}
}
fn safe_fmt<F>(f: &mut fmt::Formatter<'_>, string: &str, mut check: F) -> fmt::Result
where
F: FnMut(char) -> bool,
{
let mut valid = true;
for c in string.chars() {
if check(c) {
valid = true;
f.write_char(c)?;
} else if valid {
valid = false;
f.write_char('_')?;
}
}
Ok(())
}
struct SafeKey<'s>(&'s str);
impl<'s> fmt::Display for SafeKey<'s> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
safe_fmt(f, self.0, |c| {
c.is_ascii_alphanumeric() || matches!(c, '_' | '-' | '.' | '/')
})
}
}
struct SafeVal<'s>(&'s str);
impl<'s> fmt::Display for SafeVal<'s> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
safe_fmt(f, self.0, |c| {
c.is_alphanumeric()
|| matches!(
c,
'_' | ':' | '/' | '@' | '.' | '{' | '}' | '[' | ']' | '$' | '-'
)
})
}
}
#[cfg(test)]
mod tests {
use crate::test::{with_captured_envelopes, with_captured_envelopes_options};
use crate::ClientOptions;
use super::*;
fn current_time() -> (SystemTime, u64) {
let now = SystemTime::now();
let timestamp = now.duration_since(UNIX_EPOCH).unwrap_or_default().as_secs();
let timestamp = timestamp / 10 * 10;
(now, timestamp)
}
fn get_single_metrics(envelopes: &[Envelope]) -> &str {
assert_eq!(envelopes.len(), 1, "expected exactly one envelope");
let mut items = envelopes[0].items();
let Some(EnvelopeItem::Statsd(payload)) = items.next() else {
panic!("expected metrics item");
};
std::str::from_utf8(payload).unwrap().trim()
}
#[test]
fn test_tags() {
let (time, ts) = current_time();
let envelopes = with_captured_envelopes(|| {
Metric::count("my.metric")
.with_tag("foo", "bar")
.with_tag("and", "more")
.with_time(time)
.send();
});
let metrics = get_single_metrics(&envelopes);
assert_eq!(metrics, format!("my.metric:1|c|#and:more,foo:bar|T{ts}"));
}
#[test]
fn test_unit() {
let (time, ts) = current_time();
let envelopes = with_captured_envelopes(|| {
Metric::count("my.metric")
.with_time(time)
.with_unit("custom")
.send();
});
let metrics = get_single_metrics(&envelopes);
assert_eq!(metrics, format!("my.metric@custom:1|c|T{ts}"));
}
#[test]
fn test_metric_sanitation() {
let (time, ts) = current_time();
let envelopes = with_captured_envelopes(|| {
Metric::count("my$$$metric").with_time(time).send();
});
let metrics = get_single_metrics(&envelopes);
assert_eq!(metrics, format!("my_metric:1|c|T{ts}"));
}
#[test]
fn test_tag_sanitation() {
let (time, ts) = current_time();
let envelopes = with_captured_envelopes(|| {
Metric::count("my.metric")
.with_tag("foo-bar$$$blub", "%$föö{}")
.with_time(time)
.send();
});
let metrics = get_single_metrics(&envelopes);
assert_eq!(
metrics,
format!("my.metric:1|c|#foo-bar_blub:_$föö{{}}|T{ts}")
);
}
#[test]
fn test_own_namespace() {
let (time, ts) = current_time();
let envelopes = with_captured_envelopes(|| {
Metric::count("ns/my.metric").with_time(time).send();
});
let metrics = get_single_metrics(&envelopes);
assert_eq!(metrics, format!("ns/my.metric:1|c|T{ts}"));
}
#[test]
fn test_default_tags() {
let (time, ts) = current_time();
let options = ClientOptions {
release: Some("myapp@1.0.0".into()),
environment: Some("production".into()),
..Default::default()
};
let envelopes = with_captured_envelopes_options(
|| {
Metric::count("requests")
.with_tag("foo", "bar")
.with_time(time)
.send();
},
options,
);
let metrics = get_single_metrics(&envelopes);
assert_eq!(
metrics,
format!("requests:1|c|#foo:bar,environment:production,release:myapp@1.0.0|T{ts}")
);
}
#[test]
fn test_counter() {
let (time, ts) = current_time();
let envelopes = with_captured_envelopes(|| {
Metric::count("my.metric").with_time(time).send();
Metric::incr("my.metric", 2.0).with_time(time).send();
});
let metrics = get_single_metrics(&envelopes);
assert_eq!(metrics, format!("my.metric:3|c|T{ts}"));
}
#[test]
fn test_timing() {
let (time, ts) = current_time();
let envelopes = with_captured_envelopes(|| {
Metric::timing("my.metric", Duration::from_millis(200))
.with_time(time)
.send();
Metric::timing("my.metric", Duration::from_millis(100))
.with_time(time)
.send();
});
let metrics = get_single_metrics(&envelopes);
assert_eq!(metrics, format!("my.metric@second:0.2:0.1|d|T{ts}"));
}
#[test]
fn test_distribution() {
let (time, ts) = current_time();
let envelopes = with_captured_envelopes(|| {
Metric::distribution("my.metric", 2.0)
.with_time(time)
.send();
Metric::distribution("my.metric", 1.0)
.with_time(time)
.send();
});
let metrics = get_single_metrics(&envelopes);
assert_eq!(metrics, format!("my.metric:2:1|d|T{ts}"));
}
#[test]
fn test_set() {
let (time, ts) = current_time();
let envelopes = with_captured_envelopes(|| {
Metric::set("my.metric", "hello").with_time(time).send();
Metric::set("my.metric", "hello").with_time(time).send();
Metric::set("my.metric", "world").with_time(time).send();
});
let metrics = get_single_metrics(&envelopes);
assert_eq!(metrics, format!("my.metric:3410894750:3817476724|s|T{ts}"));
}
#[test]
fn test_gauge() {
let (time, ts) = current_time();
let envelopes = with_captured_envelopes(|| {
Metric::gauge("my.metric", 2.0).with_time(time).send();
Metric::gauge("my.metric", 1.0).with_time(time).send();
Metric::gauge("my.metric", 1.5).with_time(time).send();
});
let metrics = get_single_metrics(&envelopes);
assert_eq!(metrics, format!("my.metric:1.5:1:2:4.5:3|g|T{ts}"));
}
#[test]
fn test_multiple() {
let (time, ts) = current_time();
let envelopes = with_captured_envelopes(|| {
Metric::count("my.metric").with_time(time).send();
Metric::distribution("my.dist", 2.0).with_time(time).send();
});
let metrics = get_single_metrics(&envelopes);
println!("{metrics}");
assert!(metrics.contains(&format!("my.metric:1|c|T{ts}")));
assert!(metrics.contains(&format!("my.dist:2|d|T{ts}")));
}
#[test]
fn test_regression_parse_statsd() {
let payload = "docker.net.bytes_rcvd:27763.20237096717:27763.20237096717:27763.20237096717:27763.20237096717:1|g|#container_id:97df61f5c55b58ec9c04da3e03edc8a875ec90eb405eb5645ad9a86d0a7cd3ee,container_name:app_sidekiq_1";
let metric = Metric::parse_statsd(payload).unwrap();
assert_eq!(metric.name, "docker.net.bytes_rcvd");
assert_eq!(metric.value, MetricValue::Gauge(27763.20237096717));
assert_eq!(
metric.tags["container_id"],
"97df61f5c55b58ec9c04da3e03edc8a875ec90eb405eb5645ad9a86d0a7cd3ee"
);
assert_eq!(metric.tags["container_name"], "app_sidekiq_1");
}
}