sentry_core/metrics/
mod.rs

1//! Utilities to track metrics in Sentry.
2//!
3//! Metrics are numerical values that can track anything about your environment over time, from
4//! latency to error rates to user signups.
5//!
6//! Metrics at Sentry come in different flavors, in order to help you track your data in the most
7//! efficient and cost-effective way. The types of metrics we currently support are:
8//!
9//!  - **Counters** track a value that can only be incremented.
10//!  - **Distributions** track a list of values over time in on which you can perform aggregations
11//!    like max, min, avg.
12//!  - **Gauges** track a value that can go up and down.
13//!  - **Sets** track a set of values on which you can perform aggregations such as count_unique.
14//!
15//! For more information on metrics in Sentry, see [our docs].
16//!
17//! # Usage
18//!
19//! To collect a metric, use the [`Metric`] struct to capture all relevant properties of your
20//! metric. Then, use [`send`](Metric::send) to send the metric to Sentry:
21//!
22//! ```
23//! use std::time::Duration;
24//! use sentry::metrics::Metric;
25//!
26//! Metric::count("requests")
27//!     .with_tag("method", "GET")
28//!     .send();
29//!
30//! Metric::timing("request.duration", Duration::from_millis(17))
31//!     .with_tag("status_code", "200")
32//!     // unit is added automatically by timing
33//!     .send();
34//!
35//! Metric::set("site.visitors", "user1")
36//!     .with_unit("user")
37//!     .send();
38//! ```
39//!
40//! # Usage with Cadence
41//!
42//! [`cadence`] is a popular Statsd client for Rust and can be used to send metrics to Sentry. To
43//! use Sentry directly with `cadence`, see the [`sentry-cadence`](crate::cadence) documentation.
44//!
45//! [our docs]: https://develop.sentry.dev/services/metrics/
46
47mod normalization;
48
49use std::borrow::Cow;
50use std::collections::hash_map::Entry;
51use std::collections::{BTreeMap, BTreeSet, HashMap};
52use std::fmt::{self, Display};
53use std::sync::{Arc, Mutex};
54use std::thread::{self, JoinHandle};
55use std::time::{Duration, SystemTime, UNIX_EPOCH};
56
57use sentry_types::protocol::latest::{Envelope, EnvelopeItem};
58
59use crate::client::TransportArc;
60use crate::{ClientOptions, Hub};
61
62pub use crate::units::*;
63
64const BUCKET_INTERVAL: Duration = Duration::from_secs(10);
65const FLUSH_INTERVAL: Duration = Duration::from_secs(5);
66const MAX_WEIGHT: usize = 100_000;
67
68/// Type alias for strings used in [`Metric`] for names and tags.
69pub type MetricStr = Cow<'static, str>;
70
71/// Type used for [`MetricValue::Counter`].
72pub type CounterValue = f64;
73
74/// Type used for [`MetricValue::Distribution`].
75pub type DistributionValue = f64;
76
77/// Type used for [`MetricValue::Set`].
78pub type SetValue = u32;
79
80/// Type used for [`MetricValue::Gauge`].
81pub type GaugeValue = f64;
82
83/// The value of a [`Metric`], indicating its type.
84#[derive(Debug, Clone, Copy, PartialEq)]
85pub enum MetricValue {
86    /// Counts instances of an event.
87    ///
88    /// Counters can be incremented and decremented. The default operation is to increment a counter
89    /// by `1`, although increments by larger values and even floating point values are possible.
90    ///
91    /// # Example
92    ///
93    /// ```
94    /// use sentry::metrics::{Metric, MetricValue};
95    ///
96    /// Metric::build("my.counter", MetricValue::Counter(1.0)).send();
97    /// ```
98    Counter(CounterValue),
99
100    /// Builds a statistical distribution over values reported.
101    ///
102    /// Based on individual reported values, distributions allow to query the maximum, minimum, or
103    /// average of the reported values, as well as statistical quantiles. With an increasing number
104    /// of values in the distribution, its accuracy becomes approximate.
105    ///
106    /// # Example
107    ///
108    /// ```
109    /// use sentry::metrics::{Metric, MetricValue};
110    ///
111    /// Metric::build("my.distribution", MetricValue::Distribution(42.0)).send();
112    /// ```
113    Distribution(DistributionValue),
114
115    /// Counts the number of unique reported values.
116    ///
117    /// Sets allow sending arbitrary discrete values, including strings, and store the deduplicated
118    /// count. With an increasing number of unique values in the set, its accuracy becomes
119    /// approximate. It is not possible to query individual values from a set.
120    ///
121    /// # Example
122    ///
123    /// To create a set value, use [`MetricValue::set_from_str`] or
124    /// [`MetricValue::set_from_display`]. These functions convert the provided argument into a
125    /// unique hash value, which is then used as the set value.
126    ///
127    /// ```
128    /// use sentry::metrics::{Metric, MetricValue};
129    ///
130    /// Metric::build("my.set", MetricValue::set_from_str("foo")).send();
131    /// ```
132    Set(SetValue),
133
134    /// Stores absolute snapshots of values.
135    ///
136    /// In addition to plain [counters](Self::Counter), gauges store a snapshot of the maximum,
137    /// minimum and sum of all values, as well as the last reported value. Note that the "last"
138    /// component of this aggregation is not commutative. Which value is preserved as last value is
139    /// implementation-defined.
140    ///
141    /// # Example
142    ///
143    /// ```
144    /// use sentry::metrics::{Metric, MetricValue};
145    ///
146    /// Metric::build("my.gauge", MetricValue::Gauge(42.0)).send();
147    /// ```
148    Gauge(GaugeValue),
149}
150
151impl MetricValue {
152    /// Returns a set value representing the given string.
153    pub fn set_from_str(string: &str) -> Self {
154        Self::Set(hash_set_value(string))
155    }
156
157    /// Returns a set value representing the given argument.
158    pub fn set_from_display(display: impl fmt::Display) -> Self {
159        Self::Set(hash_set_value(&display.to_string()))
160    }
161
162    /// Returns the type of the metric value.
163    fn ty(&self) -> MetricType {
164        match self {
165            Self::Counter(_) => MetricType::Counter,
166            Self::Distribution(_) => MetricType::Distribution,
167            Self::Gauge(_) => MetricType::Gauge,
168            Self::Set(_) => MetricType::Set,
169        }
170    }
171}
172
173impl Display for MetricValue {
174    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
175        match self {
176            Self::Counter(v) => write!(f, "{}", v),
177            Self::Distribution(v) => write!(f, "{}", v),
178            Self::Gauge(v) => write!(f, "{}", v),
179            Self::Set(v) => write!(f, "{}", v),
180        }
181    }
182}
183
184/// Hashes the given set value.
185///
186/// Sets only guarantee 32-bit accuracy, but arbitrary strings are allowed on the protocol. Upon
187/// parsing, they are hashed and only used as hashes subsequently.
188fn hash_set_value(string: &str) -> u32 {
189    crc32fast::hash(string.as_bytes())
190}
191
192#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord)]
193enum MetricType {
194    Counter,
195    Distribution,
196    Set,
197    Gauge,
198}
199
200impl MetricType {
201    /// Return the shortcode for this metric type.
202    pub fn as_str(&self) -> &'static str {
203        match self {
204            MetricType::Counter => "c",
205            MetricType::Distribution => "d",
206            MetricType::Set => "s",
207            MetricType::Gauge => "g",
208        }
209    }
210}
211
212impl fmt::Display for MetricType {
213    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
214        f.write_str(self.as_str())
215    }
216}
217
218impl std::str::FromStr for MetricType {
219    type Err = ();
220
221    fn from_str(s: &str) -> Result<Self, Self::Err> {
222        Ok(match s {
223            "c" | "m" => Self::Counter,
224            "h" | "d" | "ms" => Self::Distribution,
225            "s" => Self::Set,
226            "g" => Self::Gauge,
227            _ => return Err(()),
228        })
229    }
230}
231
232/// A snapshot of values.
233#[derive(Clone, Copy, Debug, PartialEq)]
234struct GaugeSummary {
235    /// The last value reported in the bucket.
236    ///
237    /// This aggregation is not commutative.
238    pub last: GaugeValue,
239    /// The minimum value reported in the bucket.
240    pub min: GaugeValue,
241    /// The maximum value reported in the bucket.
242    pub max: GaugeValue,
243    /// The sum of all values reported in the bucket.
244    pub sum: GaugeValue,
245    /// The number of times this bucket was updated with a new value.
246    pub count: u64,
247}
248
249impl GaugeSummary {
250    /// Creates a gauge snapshot from a single value.
251    pub fn single(value: GaugeValue) -> Self {
252        Self {
253            last: value,
254            min: value,
255            max: value,
256            sum: value,
257            count: 1,
258        }
259    }
260
261    /// Inserts a new value into the gauge.
262    pub fn insert(&mut self, value: GaugeValue) {
263        self.last = value;
264        self.min = self.min.min(value);
265        self.max = self.max.max(value);
266        self.sum += value;
267        self.count += 1;
268    }
269}
270
271/// The aggregated value of a [`Metric`] bucket.
272#[derive(Debug)]
273enum BucketValue {
274    Counter(CounterValue),
275    Distribution(Vec<DistributionValue>),
276    Set(BTreeSet<SetValue>),
277    Gauge(GaugeSummary),
278}
279
280impl BucketValue {
281    /// Inserts a new value into the bucket and returns the added weight.
282    pub fn insert(&mut self, value: MetricValue) -> usize {
283        match (self, value) {
284            (Self::Counter(c1), MetricValue::Counter(c2)) => {
285                *c1 += c2;
286                0
287            }
288            (Self::Distribution(d1), MetricValue::Distribution(d2)) => {
289                d1.push(d2);
290                1
291            }
292            (Self::Set(s1), MetricValue::Set(s2)) => {
293                if s1.insert(s2) {
294                    1
295                } else {
296                    0
297                }
298            }
299            (Self::Gauge(g1), MetricValue::Gauge(g2)) => {
300                g1.insert(g2);
301                0
302            }
303            _ => panic!("invalid metric type"),
304        }
305    }
306
307    /// Returns the number of values stored in this bucket.
308    pub fn weight(&self) -> usize {
309        match self {
310            BucketValue::Counter(_) => 1,
311            BucketValue::Distribution(v) => v.len(),
312            BucketValue::Set(v) => v.len(),
313            BucketValue::Gauge(_) => 5,
314        }
315    }
316}
317
318impl From<MetricValue> for BucketValue {
319    fn from(value: MetricValue) -> Self {
320        match value {
321            MetricValue::Counter(v) => Self::Counter(v),
322            MetricValue::Distribution(v) => Self::Distribution(vec![v]),
323            MetricValue::Gauge(v) => Self::Gauge(GaugeSummary::single(v)),
324            MetricValue::Set(v) => Self::Set(BTreeSet::from([v])),
325        }
326    }
327}
328
329/// A metric value that contains a numeric value and metadata to be sent to Sentry.
330///
331/// # Units
332///
333/// To make the most out of metrics in Sentry, consider assigning a unit during construction. This
334/// can be achieved using the [`with_unit`](MetricBuilder::with_unit) builder method. See the
335/// documentation for more examples on units.
336///
337/// ```
338/// use sentry::metrics::{Metric, InformationUnit};
339///
340/// Metric::distribution("request.size", 47.2)
341///     .with_unit(InformationUnit::Byte)
342///     .send();
343/// ```
344///
345/// # Sending Metrics
346///
347/// Metrics can be sent to Sentry directly using the [`send`](MetricBuilder::send) method on the
348/// constructor. This will send the metric to the [`Client`](crate::Client) on the current [`Hub`].
349/// If there is no client on the current hub, the metric is dropped.
350///
351/// ```
352/// use sentry::metrics::Metric;
353///
354/// Metric::count("requests")
355///     .with_tag("method", "GET")
356///     .send();
357/// ```
358///
359/// # Sending to a Custom Client
360///
361/// Metrics can also be sent to a custom client. This is useful if you want to send metrics to a
362/// different Sentry project or with different configuration. To do so, finish building the metric
363/// and then call [`add_metric`](crate::Client::add_metric) to the client:
364///
365/// ```
366/// use sentry::Hub;
367/// use sentry::metrics::Metric;
368///
369/// let metric = Metric::count("requests")
370///    .with_tag("method", "GET")
371///    .finish();
372///
373/// // Obtain a client from somewhere
374/// if let Some(client) = Hub::current().client() {
375///     client.add_metric(metric);
376/// }
377/// ```
378#[derive(Debug)]
379pub struct Metric {
380    /// The name of the metric, identifying it in Sentry.
381    ///
382    /// The name should consist of
383    name: MetricStr,
384    unit: MetricUnit,
385    value: MetricValue,
386    tags: BTreeMap<MetricStr, MetricStr>,
387    time: Option<SystemTime>,
388}
389
390impl Metric {
391    /// Creates a new metric with the stated name and value.
392    ///
393    /// The provided name identifies the metric in Sentry. It should consist of alphanumeric
394    /// characters and `_`, `-`, and `.`. While a single forward slash (`/`) is also allowed in
395    /// metric names, it has a special meaning and should not be used in regular metric names. All
396    /// characters that do not match this criteria are sanitized.
397    ///
398    /// The value of the metric determines its type. See the [struct-level](self) docs and
399    /// constructor methods for examples on how to build metrics.
400    pub fn build(name: impl Into<MetricStr>, value: MetricValue) -> MetricBuilder {
401        let metric = Metric {
402            name: name.into(),
403            unit: MetricUnit::None,
404            value,
405            tags: BTreeMap::new(),
406            time: None,
407        };
408
409        MetricBuilder { metric }
410    }
411
412    /// Parses a metric from a StatsD string.
413    ///
414    /// This supports regular StatsD payloads with an extension for tags. In the below example, tags
415    /// are optional:
416    ///
417    /// ```plain
418    /// <metricname>:<value>|<type>|#<tag1>:<value1>,<tag2>:<value2>
419    /// ```
420    ///
421    /// Units are encoded into the metric name, separated by an `@`:
422    ///
423    /// ```plain
424    /// <metricname>@<unit>:<value>|<type>|#<tag1>:<value1>,<tag2>:<value2>
425    /// ```
426    pub fn parse_statsd(string: &str) -> Result<Self, ParseMetricError> {
427        parse_metric_opt(string).ok_or(ParseMetricError(()))
428    }
429
430    /// Builds a metric that increments a [counter](MetricValue::Counter) by the given value.
431    ///
432    /// # Example
433    ///
434    /// ```
435    /// use sentry::metrics::{Metric};
436    ///
437    /// Metric::incr("operation.total_values", 7.0).send();
438    /// ```
439    pub fn incr(name: impl Into<MetricStr>, value: f64) -> MetricBuilder {
440        Self::build(name, MetricValue::Counter(value))
441    }
442
443    /// Builds a metric that [counts](MetricValue::Counter) the single occurrence of an event.
444    ///
445    /// # Example
446    ///
447    /// ```
448    /// use sentry::metrics::{Metric};
449    ///
450    /// Metric::count("requests").send();
451    /// ```
452    pub fn count(name: impl Into<MetricStr>) -> MetricBuilder {
453        Self::build(name, MetricValue::Counter(1.0))
454    }
455
456    /// Builds a metric that tracks the duration of an operation.
457    ///
458    /// This is a [distribution](MetricValue::Distribution) metric that is tracked in seconds.
459    ///
460    /// # Example
461    ///
462    /// ```
463    /// use std::time::Duration;
464    /// use sentry::metrics::{Metric};
465    ///
466    /// Metric::timing("operation", Duration::from_secs(1)).send();
467    /// ```
468    pub fn timing(name: impl Into<MetricStr>, timing: Duration) -> MetricBuilder {
469        Self::build(name, MetricValue::Distribution(timing.as_secs_f64()))
470            .with_unit(DurationUnit::Second)
471    }
472
473    /// Builds a metric that tracks the [distribution](MetricValue::Distribution) of values.
474    ///
475    /// # Example
476    ///
477    /// ```
478    /// use sentry::metrics::{Metric};
479    ///
480    /// Metric::distribution("operation.batch_size", 42.0).send();
481    /// ```
482    pub fn distribution(name: impl Into<MetricStr>, value: f64) -> MetricBuilder {
483        Self::build(name, MetricValue::Distribution(value))
484    }
485
486    /// Builds a metric that tracks the [unique number](MetricValue::Set) of values provided.
487    ///
488    /// See [`MetricValue`] for more ways to construct sets.
489    ///
490    /// # Example
491    ///
492    /// ```
493    /// use sentry::metrics::{Metric};
494    ///
495    /// Metric::set("users", "user1").send();
496    /// ```
497    pub fn set(name: impl Into<MetricStr>, string: &str) -> MetricBuilder {
498        Self::build(name, MetricValue::set_from_str(string))
499    }
500
501    /// Builds a metric that tracks the [snapshot](MetricValue::Gauge) of provided values.
502    ///
503    /// # Example
504    ///
505    /// ```
506    /// use sentry::metrics::{Metric};
507    ///
508    /// Metric::gauge("cache.size", 42.0).send();
509    /// ```
510    pub fn gauge(name: impl Into<MetricStr>, value: f64) -> MetricBuilder {
511        Self::build(name, MetricValue::Gauge(value))
512    }
513
514    /// Sends the metric to the current client.
515    ///
516    /// When building a metric, you can use [`MetricBuilder::send`] to send the metric directly. If
517    /// there is no client on the current [`Hub`], the metric is dropped.
518    pub fn send(self) {
519        if let Some(client) = Hub::current().client() {
520            client.add_metric(self);
521        }
522    }
523
524    /// Convert the metric into an [`Envelope`] containing a single [`EnvelopeItem::Statsd`].
525    pub fn to_envelope(self) -> Envelope {
526        let timestamp = self
527            .time
528            .unwrap_or(SystemTime::now())
529            .duration_since(UNIX_EPOCH)
530            .unwrap_or_default()
531            .as_secs();
532        let data = format!(
533            "{}@{}:{}|{}|#{}|T{}",
534            normalization::normalize_name(self.name.as_ref()),
535            normalization::normalize_unit(self.unit.to_string().as_ref()),
536            self.value,
537            self.value.ty(),
538            normalization::normalize_tags(&self.tags),
539            timestamp
540        );
541        EnvelopeItem::Statsd(data.into_bytes()).into()
542    }
543}
544
545/// A builder for metrics.
546///
547/// Use one of the [`Metric`] constructors to create a new builder. See the struct-level docs for
548/// examples of how to build metrics.
549#[must_use]
550#[derive(Debug)]
551pub struct MetricBuilder {
552    metric: Metric,
553}
554
555impl MetricBuilder {
556    /// Sets the unit for the metric.
557    ///
558    /// The unit augments the metric value by giving it a magnitude and semantics. Some units have
559    /// special support when rendering metrics or their values in Sentry, such as for timings. See
560    /// [`MetricUnit`] for more information on the supported units. The unit can be set to
561    /// [`MetricUnit::None`] to indicate that the metric has no unit, or to [`MetricUnit::Custom`]
562    /// to indicate a user-defined unit.
563    ///
564    /// By default, the unit is set to [`MetricUnit::None`].
565    pub fn with_unit(mut self, unit: impl Into<MetricUnit>) -> Self {
566        self.metric.unit = unit.into();
567        self
568    }
569
570    /// Adds a tag to the metric.
571    ///
572    /// Tags allow you to add dimensions to metrics. They are key-value pairs that can be filtered
573    /// or grouped by in Sentry.
574    ///
575    /// When sent to Sentry via [`MetricBuilder::send`] or when added to a
576    /// [`Client`](crate::Client), the client may add default tags to the metrics, such as the
577    /// `release` or the `environment` from the Scope.
578    pub fn with_tag(mut self, name: impl Into<MetricStr>, value: impl Into<MetricStr>) -> Self {
579        self.metric.tags.insert(name.into(), value.into());
580        self
581    }
582
583    /// Adds multiple tags to the metric.
584    ///
585    /// Tags allow you to add dimensions to metrics. They are key-value pairs that can be filtered
586    /// or grouped by in Sentry.
587    ///
588    /// When sent to Sentry via [`MetricBuilder::send`] or when added to a
589    /// [`Client`](crate::Client), the client may add default tags to the metrics, such as the
590    /// `release` or the `environment` from the Scope.
591    pub fn with_tags<T, K, V>(mut self, tags: T) -> Self
592    where
593        T: IntoIterator<Item = (K, V)>,
594        K: Into<MetricStr>,
595        V: Into<MetricStr>,
596    {
597        for (k, v) in tags {
598            self.metric.tags.insert(k.into(), v.into());
599        }
600        self
601    }
602
603    /// Sets the timestamp for the metric.
604    ///
605    /// By default, the timestamp is set to the current time when the metric is built or sent.
606    pub fn with_time(mut self, time: SystemTime) -> Self {
607        self.metric.time = Some(time);
608        self
609    }
610
611    /// Builds the metric.
612    pub fn finish(self) -> Metric {
613        self.metric
614    }
615
616    /// Sends the metric to the current client.
617    ///
618    /// This is a shorthand for `.finish().send()`. If there is no client on the current [`Hub`],
619    /// the metric is dropped.
620    pub fn send(self) {
621        self.finish().send()
622    }
623}
624
625/// Error emitted from [`Metric::parse_statsd`] for invalid metric strings.
626#[derive(Debug)]
627pub struct ParseMetricError(());
628
629impl std::error::Error for ParseMetricError {}
630
631impl fmt::Display for ParseMetricError {
632    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
633        f.write_str("invalid metric string")
634    }
635}
636
637fn parse_metric_opt(string: &str) -> Option<Metric> {
638    let mut components = string.split('|');
639
640    let (mri_str, value_str) = components.next()?.split_once(':')?;
641    let (name, unit) = match mri_str.split_once('@') {
642        Some((name, unit_str)) => (name, unit_str.parse().ok()?),
643        None => (mri_str, MetricUnit::None),
644    };
645
646    let ty = components.next().and_then(|s| s.parse().ok())?;
647    let value = match ty {
648        MetricType::Counter => MetricValue::Counter(value_str.parse().ok()?),
649        MetricType::Distribution => MetricValue::Distribution(value_str.parse().ok()?),
650        MetricType::Set => MetricValue::Set(value_str.parse().ok()?),
651        MetricType::Gauge => {
652            // Gauge values are serialized as `last:min:max:sum:count`. We want to be able
653            // to parse those strings back, so we just take the first colon-separated segment.
654            let value_str = value_str.split(':').next().unwrap();
655            MetricValue::Gauge(value_str.parse().ok()?)
656        }
657    };
658
659    let mut builder = Metric::build(name.to_owned(), value).with_unit(unit);
660
661    for component in components {
662        if let Some('#') = component.chars().next() {
663            for pair in component.get(1..)?.split(',') {
664                let mut key_value = pair.splitn(2, ':');
665
666                let key = key_value.next()?.to_owned();
667                let value = key_value.next().unwrap_or_default().to_owned();
668
669                builder = builder.with_tag(key, value);
670            }
671        }
672    }
673
674    Some(builder.finish())
675}
676
677/// Composite bucket key for [`BucketMap`].
678#[derive(Debug, PartialEq, Eq, Hash)]
679struct BucketKey {
680    ty: MetricType,
681    name: MetricStr,
682    unit: MetricUnit,
683    tags: BTreeMap<MetricStr, MetricStr>,
684}
685
686/// UNIX timestamp used for buckets.
687type Timestamp = u64;
688
689/// A nested map storing metric buckets.
690///
691/// This map consists of two levels:
692///  1. The rounded UNIX timestamp of buckets.
693///  2. The metric buckets themselves with a corresponding timestamp.
694///
695/// This structure allows for efficient dequeueing of buckets that are older than a certain
696/// threshold. The buckets are dequeued in order of their timestamp, so the oldest buckets are
697/// dequeued first.
698type BucketMap = BTreeMap<Timestamp, HashMap<BucketKey, BucketValue>>;
699
700#[derive(Debug)]
701struct SharedAggregatorState {
702    buckets: BucketMap,
703    weight: usize,
704    running: bool,
705    force_flush: bool,
706}
707
708impl SharedAggregatorState {
709    pub fn new() -> Self {
710        Self {
711            buckets: BTreeMap::new(),
712            weight: 0,
713            running: true,
714            force_flush: false,
715        }
716    }
717
718    /// Adds a new bucket to the aggregator.
719    ///
720    /// The bucket timestamp is rounded to the nearest bucket interval. Note that this does NOT
721    /// automatically flush the aggregator if the weight exceeds the weight threshold.
722    pub fn add(&mut self, mut timestamp: Timestamp, key: BucketKey, value: MetricValue) {
723        // Floor timestamp to bucket interval
724        timestamp /= BUCKET_INTERVAL.as_secs();
725        timestamp *= BUCKET_INTERVAL.as_secs();
726
727        match self.buckets.entry(timestamp).or_default().entry(key) {
728            Entry::Occupied(mut e) => self.weight += e.get_mut().insert(value),
729            Entry::Vacant(e) => self.weight += e.insert(value.into()).weight(),
730        }
731    }
732
733    /// Removes and returns all buckets that are ready to flush.
734    ///
735    /// Buckets are ready to flush as soon as their time window has closed. For example, a bucket
736    /// from timestamps `[4600, 4610)` is ready to flush immediately at `4610`.
737    pub fn take_buckets(&mut self) -> BucketMap {
738        if self.force_flush || !self.running {
739            self.weight = 0;
740            self.force_flush = false;
741            std::mem::take(&mut self.buckets)
742        } else {
743            let timestamp = SystemTime::now()
744                .duration_since(UNIX_EPOCH)
745                .unwrap_or_default()
746                .saturating_sub(BUCKET_INTERVAL)
747                .as_secs();
748
749            // Split all buckets after the cutoff time. `split` contains newer buckets, which should
750            // remain, so swap them. After the swap, `split` contains all older buckets.
751            let mut split = self.buckets.split_off(&timestamp);
752            std::mem::swap(&mut split, &mut self.buckets);
753
754            self.weight -= split
755                .values()
756                .flat_map(|map| map.values())
757                .map(|bucket| bucket.weight())
758                .sum::<usize>();
759
760            split
761        }
762    }
763
764    pub fn weight(&self) -> usize {
765        self.weight
766    }
767}
768
769type TagMap = BTreeMap<MetricStr, MetricStr>;
770
771fn get_default_tags(options: &ClientOptions) -> TagMap {
772    let mut tags = TagMap::new();
773    if let Some(ref release) = options.release {
774        tags.insert("release".into(), release.clone());
775    }
776    tags.insert(
777        "environment".into(),
778        options
779            .environment
780            .clone()
781            .filter(|e| !e.is_empty())
782            .unwrap_or(Cow::Borrowed("production")),
783    );
784    tags
785}
786
787#[derive(Clone)]
788struct Worker {
789    shared: Arc<Mutex<SharedAggregatorState>>,
790    default_tags: TagMap,
791    transport: TransportArc,
792}
793
794impl Worker {
795    pub fn run(self) {
796        loop {
797            // Park instead of sleep so we can wake the thread up. Do not account for delays during
798            // flushing, since we benefit from some drift to spread out metric submissions.
799            thread::park_timeout(FLUSH_INTERVAL);
800
801            let buckets = {
802                let mut guard = self.shared.lock().unwrap();
803                if !guard.running {
804                    break;
805                }
806                guard.take_buckets()
807            };
808
809            self.flush_buckets(buckets);
810        }
811    }
812
813    pub fn flush_buckets(&self, buckets: BucketMap) {
814        if buckets.is_empty() {
815            return;
816        }
817
818        // The transport is usually available when flush is called. Prefer a short lock and worst
819        // case throw away the result rather than blocking the transport for too long.
820        if let Ok(output) = self.format_payload(buckets) {
821            let mut envelope = Envelope::new();
822            envelope.add_item(EnvelopeItem::Statsd(output));
823
824            if let Some(ref transport) = *self.transport.read().unwrap() {
825                transport.send_envelope(envelope);
826            }
827        }
828    }
829
830    fn format_payload(&self, buckets: BucketMap) -> std::io::Result<Vec<u8>> {
831        use std::io::Write;
832        let mut out = vec![];
833
834        for (timestamp, buckets) in buckets {
835            for (key, value) in buckets {
836                write!(
837                    &mut out,
838                    "{}",
839                    normalization::normalize_name(key.name.as_ref())
840                )?;
841                match key.unit {
842                    MetricUnit::Custom(u) => {
843                        write!(&mut out, "@{}", normalization::normalize_unit(u.as_ref()))?
844                    }
845                    _ => write!(&mut out, "@{}", key.unit)?,
846                }
847                match value {
848                    BucketValue::Counter(c) => {
849                        write!(&mut out, ":{}", c)?;
850                    }
851                    BucketValue::Distribution(d) => {
852                        for v in d {
853                            write!(&mut out, ":{}", v)?;
854                        }
855                    }
856                    BucketValue::Set(s) => {
857                        for v in s {
858                            write!(&mut out, ":{}", v)?;
859                        }
860                    }
861                    BucketValue::Gauge(g) => {
862                        write!(
863                            &mut out,
864                            ":{}:{}:{}:{}:{}",
865                            g.last, g.min, g.max, g.sum, g.count
866                        )?;
867                    }
868                }
869
870                write!(&mut out, "|{}", key.ty.as_str())?;
871                let normalized_tags =
872                    normalization::normalize_tags(&key.tags).with_default_tags(&self.default_tags);
873                write!(&mut out, "|#{}", normalized_tags)?;
874                writeln!(&mut out, "|T{}", timestamp)?;
875            }
876        }
877
878        Ok(out)
879    }
880}
881
882impl fmt::Debug for Worker {
883    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
884        f.debug_struct("Worker")
885            .field("transport", &format_args!("ArcTransport"))
886            .field("default_tags", &self.default_tags)
887            .finish()
888    }
889}
890
891#[derive(Debug)]
892pub(crate) struct MetricAggregator {
893    local_worker: Worker,
894    handle: Option<JoinHandle<()>>,
895}
896
897impl MetricAggregator {
898    pub fn new(transport: TransportArc, options: &ClientOptions) -> Self {
899        let worker = Worker {
900            shared: Arc::new(Mutex::new(SharedAggregatorState::new())),
901            default_tags: get_default_tags(options),
902            transport,
903        };
904
905        let local_worker = worker.clone();
906
907        let handle = thread::Builder::new()
908            .name("sentry-metrics".into())
909            .spawn(move || worker.run())
910            .expect("failed to spawn thread");
911
912        Self {
913            local_worker,
914            handle: Some(handle),
915        }
916    }
917
918    pub fn add(&self, metric: Metric) {
919        let Metric {
920            name,
921            unit,
922            value,
923            tags,
924            time,
925        } = metric;
926
927        let timestamp = time
928            .unwrap_or_else(SystemTime::now)
929            .duration_since(UNIX_EPOCH)
930            .unwrap_or_default()
931            .as_secs();
932
933        let key = BucketKey {
934            ty: value.ty(),
935            name,
936            unit,
937            tags,
938        };
939
940        let mut guard = self.local_worker.shared.lock().unwrap();
941        guard.add(timestamp, key, value);
942
943        if guard.weight() > MAX_WEIGHT {
944            if let Some(ref handle) = self.handle {
945                guard.force_flush = true;
946                handle.thread().unpark();
947            }
948        }
949    }
950
951    pub fn flush(&self) {
952        let buckets = {
953            let mut guard = self.local_worker.shared.lock().unwrap();
954            guard.force_flush = true;
955            guard.take_buckets()
956        };
957
958        self.local_worker.flush_buckets(buckets);
959    }
960}
961
962impl Drop for MetricAggregator {
963    fn drop(&mut self) {
964        let buckets = {
965            let mut guard = self.local_worker.shared.lock().unwrap();
966            guard.running = false;
967            guard.take_buckets()
968        };
969
970        self.local_worker.flush_buckets(buckets);
971
972        if let Some(handle) = self.handle.take() {
973            handle.thread().unpark();
974            handle.join().unwrap();
975        }
976    }
977}
978
979#[cfg(test)]
980mod tests {
981    use crate::test::{with_captured_envelopes, with_captured_envelopes_options};
982    use crate::ClientOptions;
983
984    use super::*;
985
986    /// Returns the current system time and rounded bucket timestamp.
987    fn current_time() -> (SystemTime, u64) {
988        let now = SystemTime::now();
989        let timestamp = now.duration_since(UNIX_EPOCH).unwrap_or_default().as_secs();
990        let timestamp = timestamp / 10 * 10;
991
992        (now, timestamp)
993    }
994
995    fn get_single_metrics(envelopes: &[Envelope]) -> &str {
996        assert_eq!(envelopes.len(), 1, "expected exactly one envelope");
997
998        let mut items = envelopes[0].items();
999        let Some(EnvelopeItem::Statsd(payload)) = items.next() else {
1000            panic!("expected metrics item");
1001        };
1002
1003        std::str::from_utf8(payload).unwrap().trim()
1004    }
1005
1006    #[test]
1007    fn test_tags() {
1008        let (time, ts) = current_time();
1009
1010        let envelopes = with_captured_envelopes(|| {
1011            Metric::count("my.metric")
1012                .with_tag("foo", "bar")
1013                .with_tag("and", "more")
1014                .with_time(time)
1015                .send();
1016        });
1017
1018        let metrics = get_single_metrics(&envelopes);
1019        assert_eq!(
1020            metrics,
1021            format!("my.metric@none:1|c|#and:more,environment:production,foo:bar|T{ts}")
1022        );
1023    }
1024
1025    #[test]
1026    fn test_unit() {
1027        let (time, ts) = current_time();
1028
1029        let envelopes = with_captured_envelopes(|| {
1030            Metric::count("my.metric")
1031                .with_time(time)
1032                .with_unit("custom")
1033                .send();
1034        });
1035
1036        let metrics = get_single_metrics(&envelopes);
1037        assert_eq!(
1038            metrics,
1039            format!("my.metric@custom:1|c|#environment:production|T{ts}")
1040        );
1041    }
1042
1043    #[test]
1044    fn test_metric_sanitation() {
1045        let (time, ts) = current_time();
1046
1047        let envelopes = with_captured_envelopes(|| {
1048            Metric::count("my$$$metric").with_time(time).send();
1049        });
1050
1051        let metrics = get_single_metrics(&envelopes);
1052        assert_eq!(
1053            metrics,
1054            format!("my___metric@none:1|c|#environment:production|T{ts}")
1055        );
1056    }
1057
1058    #[test]
1059    fn test_tag_sanitation() {
1060        let (time, ts) = current_time();
1061
1062        let envelopes = with_captured_envelopes(|| {
1063            Metric::count("my.metric")
1064                .with_tag("foo-bar$$$blub", "%$föö{}")
1065                .with_time(time)
1066                .send();
1067        });
1068
1069        let metrics = get_single_metrics(&envelopes);
1070        assert_eq!(
1071            metrics,
1072            format!("my.metric@none:1|c|#environment:production,foo-barblub:%$föö{{}}|T{ts}")
1073        );
1074    }
1075
1076    #[test]
1077    fn test_default_tags() {
1078        let (time, ts) = current_time();
1079
1080        let options = ClientOptions {
1081            release: Some("myapp@1.0.0".into()),
1082            environment: Some("development".into()),
1083            ..Default::default()
1084        };
1085
1086        let envelopes = with_captured_envelopes_options(
1087            || {
1088                Metric::count("requests")
1089                    .with_tag("foo", "bar")
1090                    .with_time(time)
1091                    .send();
1092            },
1093            options,
1094        );
1095
1096        let metrics = get_single_metrics(&envelopes);
1097        assert_eq!(
1098            metrics,
1099            format!("requests@none:1|c|#environment:development,foo:bar,release:myapp@1.0.0|T{ts}")
1100        );
1101    }
1102
1103    #[test]
1104    fn test_empty_default_tags() {
1105        let (time, ts) = current_time();
1106        let options = ClientOptions {
1107            release: Some("".into()),
1108            environment: Some("".into()),
1109            ..Default::default()
1110        };
1111
1112        let envelopes = with_captured_envelopes_options(
1113            || {
1114                Metric::count("requests")
1115                    .with_tag("foo", "bar")
1116                    .with_time(time)
1117                    .send();
1118            },
1119            options,
1120        );
1121
1122        let metrics = get_single_metrics(&envelopes);
1123        assert_eq!(
1124            metrics,
1125            format!("requests@none:1|c|#environment:production,foo:bar|T{ts}")
1126        );
1127    }
1128
1129    #[test]
1130    fn test_override_default_tags() {
1131        let (time, ts) = current_time();
1132        let options = ClientOptions {
1133            release: Some("default_release".into()),
1134            environment: Some("default_env".into()),
1135            ..Default::default()
1136        };
1137
1138        let envelopes = with_captured_envelopes_options(
1139            || {
1140                Metric::count("requests")
1141                    .with_tag("environment", "custom_env")
1142                    .with_tag("release", "custom_release")
1143                    .with_time(time)
1144                    .send();
1145            },
1146            options,
1147        );
1148
1149        let metrics = get_single_metrics(&envelopes);
1150        assert_eq!(
1151            metrics,
1152            format!("requests@none:1|c|#environment:custom_env,release:custom_release|T{ts}")
1153        );
1154    }
1155
1156    #[test]
1157    fn test_counter() {
1158        let (time, ts) = current_time();
1159
1160        let envelopes = with_captured_envelopes(|| {
1161            Metric::count("my.metric").with_time(time).send();
1162            Metric::incr("my.metric", 2.0).with_time(time).send();
1163        });
1164
1165        let metrics = get_single_metrics(&envelopes);
1166        assert_eq!(
1167            metrics,
1168            format!("my.metric@none:3|c|#environment:production|T{ts}")
1169        );
1170    }
1171
1172    #[test]
1173    fn test_timing() {
1174        let (time, ts) = current_time();
1175
1176        let envelopes = with_captured_envelopes(|| {
1177            Metric::timing("my.metric", Duration::from_millis(200))
1178                .with_time(time)
1179                .send();
1180            Metric::timing("my.metric", Duration::from_millis(100))
1181                .with_time(time)
1182                .send();
1183        });
1184
1185        let metrics = get_single_metrics(&envelopes);
1186        assert_eq!(
1187            metrics,
1188            format!("my.metric@second:0.2:0.1|d|#environment:production|T{ts}")
1189        );
1190    }
1191
1192    #[test]
1193    fn test_distribution() {
1194        let (time, ts) = current_time();
1195
1196        let envelopes = with_captured_envelopes(|| {
1197            Metric::distribution("my.metric", 2.0)
1198                .with_time(time)
1199                .send();
1200            Metric::distribution("my.metric", 1.0)
1201                .with_time(time)
1202                .send();
1203        });
1204
1205        let metrics = get_single_metrics(&envelopes);
1206        assert_eq!(
1207            metrics,
1208            format!("my.metric@none:2:1|d|#environment:production|T{ts}")
1209        );
1210    }
1211
1212    #[test]
1213    fn test_set() {
1214        let (time, ts) = current_time();
1215
1216        let envelopes = with_captured_envelopes(|| {
1217            Metric::set("my.metric", "hello").with_time(time).send();
1218            // Duplicate that should not be reflected twice
1219            Metric::set("my.metric", "hello").with_time(time).send();
1220            Metric::set("my.metric", "world").with_time(time).send();
1221        });
1222
1223        let metrics = get_single_metrics(&envelopes);
1224        assert_eq!(
1225            metrics,
1226            format!("my.metric@none:907060870:980881731|s|#environment:production|T{ts}")
1227        );
1228    }
1229
1230    #[test]
1231    fn test_gauge() {
1232        let (time, ts) = current_time();
1233
1234        let envelopes = with_captured_envelopes(|| {
1235            Metric::gauge("my.metric", 2.0).with_time(time).send();
1236            Metric::gauge("my.metric", 1.0).with_time(time).send();
1237            Metric::gauge("my.metric", 1.5).with_time(time).send();
1238        });
1239
1240        let metrics = get_single_metrics(&envelopes);
1241        assert_eq!(
1242            metrics,
1243            format!("my.metric@none:1.5:1:2:4.5:3|g|#environment:production|T{ts}")
1244        );
1245    }
1246
1247    #[test]
1248    fn test_multiple() {
1249        let (time, ts) = current_time();
1250
1251        let envelopes = with_captured_envelopes(|| {
1252            Metric::count("my.metric").with_time(time).send();
1253            Metric::distribution("my.dist", 2.0).with_time(time).send();
1254        });
1255
1256        let metrics = get_single_metrics(&envelopes);
1257        println!("{metrics}");
1258
1259        assert!(metrics.contains(&format!("my.metric@none:1|c|#environment:production|T{ts}")));
1260        assert!(metrics.contains(&format!("my.dist@none:2|d|#environment:production|T{ts}")));
1261    }
1262
1263    #[test]
1264    fn test_regression_parse_statsd() {
1265        let payload = "docker.net.bytes_rcvd:27763.20237096717:27763.20237096717:27763.20237096717:27763.20237096717:1|g|#container_id:97df61f5c55b58ec9c04da3e03edc8a875ec90eb405eb5645ad9a86d0a7cd3ee,container_name:app_sidekiq_1";
1266        let metric = Metric::parse_statsd(payload).unwrap();
1267        assert_eq!(metric.name, "docker.net.bytes_rcvd");
1268        assert_eq!(metric.value, MetricValue::Gauge(27763.20237096717));
1269        assert_eq!(
1270            metric.tags["container_id"],
1271            "97df61f5c55b58ec9c04da3e03edc8a875ec90eb405eb5645ad9a86d0a7cd3ee"
1272        );
1273        assert_eq!(metric.tags["container_name"], "app_sidekiq_1");
1274    }
1275}