Skip to main content

libdd_trace_stats/span_concentrator/
mod.rs

1// Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/
2// SPDX-License-Identifier: Apache-2.0
3//! This module implements the SpanConcentrator used to aggregate spans into stats
4use std::collections::HashMap;
5use std::time::{self, Duration, SystemTime};
6
7use libdd_trace_protobuf::pb;
8
9use aggregation::{BorrowedAggregationKey, StatsBucket};
10use stat_span::StatSpan;
11
12mod aggregation;
13
14mod stat_span;
15
16/// Return a Duration between t and the unix epoch
17/// If t is before the unix epoch return 0
18fn system_time_to_unix_duration(t: SystemTime) -> Duration {
19    t.duration_since(time::UNIX_EPOCH)
20        .unwrap_or(Duration::from_nanos(0))
21}
22
23/// Align a timestamp on the start of a bucket
24#[inline]
25fn align_timestamp(t: u64, bucket_size: u64) -> u64 {
26    t - (t % bucket_size)
27}
28
29/// Return true if the span is eligible for stats computation
30fn is_span_eligible<'a, T>(span: &'a T, span_kinds_stats_computed: &[String]) -> bool
31where
32    T: StatSpan<'a>,
33{
34    (span.has_top_level() || span.is_measured() || {
35        span.get_meta("span.kind")
36            .is_some_and(|span_kind| span_kinds_stats_computed.contains(&span_kind.to_lowercase()))
37    }) && !span.is_partial_snapshot()
38}
39
40/// SpanConcentrator compute stats on span aggregated by time and span attributes
41///
42/// # Aggregation
43/// Spans are aggregated into time buckets based on their end_time. Within each time bucket there
44/// is another level of aggregation based on the spans fields (e.g. resource_name, service_name)
45/// and the peer tags if the `peer_tags_aggregation` is enabled.
46///
47/// # Span eligibility
48/// The ingested spans are only aggregated if they are root, top-level, measured or if their
49/// `span.kind` is eligible and the `compute_stats_by_span_kind` is enabled.
50///
51/// # Flushing
52/// When the SpanConcentrator is flushed it keeps the `buffer_len` most recent buckets and remove
53/// all older buckets returning their content. When using force flush all buckets are flushed
54/// regardless of their age.
55#[derive(Debug, Clone)]
56pub struct SpanConcentrator {
57    /// Size of the time buckets used for aggregation in nanos
58    bucket_size: u64,
59    buckets: HashMap<u64, StatsBucket>,
60    /// Timestamp of the oldest time bucket for which we allow data.
61    /// Any ingested stats older than it get added to this bucket.
62    oldest_timestamp: u64,
63    /// bufferLen is the number stats bucket we keep when flushing.
64    buffer_len: usize,
65    /// span.kind fields eligible for stats computation
66    span_kinds_stats_computed: Vec<String>,
67    /// keys for supplementary tags that describe peer.service entities
68    peer_tag_keys: Vec<String>,
69}
70
71impl SpanConcentrator {
72    /// Return a new concentrator with the given parameters
73    /// - `bucket_size` is the size of the time buckets
74    /// - `now` the current system time, used to define the oldest bucket
75    /// - `span_kinds_stats_computed` list of span kinds eligible for stats computation
76    /// - `peer_tags_keys` list of keys considered as peer tags for aggregation
77    pub fn new(
78        bucket_size: Duration,
79        now: SystemTime,
80        span_kinds_stats_computed: Vec<String>,
81        peer_tag_keys: Vec<String>,
82    ) -> SpanConcentrator {
83        SpanConcentrator {
84            bucket_size: bucket_size.as_nanos() as u64,
85            buckets: HashMap::new(),
86            oldest_timestamp: align_timestamp(
87                system_time_to_unix_duration(now).as_nanos() as u64,
88                bucket_size.as_nanos() as u64,
89            ),
90            buffer_len: 2,
91            span_kinds_stats_computed,
92            peer_tag_keys,
93        }
94    }
95
96    /// Set the list of span kinds eligible for stats computation
97    pub fn set_span_kinds(&mut self, span_kinds: Vec<String>) {
98        self.span_kinds_stats_computed = span_kinds;
99    }
100
101    /// Set the list of keys considered as peer_tags for aggregation
102    pub fn set_peer_tags(&mut self, peer_tags: Vec<String>) {
103        self.peer_tag_keys = peer_tags;
104    }
105
106    /// Return the bucket size used for aggregation
107    pub fn get_bucket_size(&self) -> Duration {
108        Duration::from_nanos(self.bucket_size)
109    }
110
111    /// Add a span into the concentrator, by computing stats if the span is eligible for stats
112    /// computation.
113    pub fn add_span<'a, T>(&'a mut self, span: &'a T)
114    where
115        T: StatSpan<'a>,
116    {
117        // If the span is eligible for stats computation
118        if is_span_eligible(span, self.span_kinds_stats_computed.as_slice()) {
119            let mut bucket_timestamp =
120                align_timestamp((span.start() + span.duration()) as u64, self.bucket_size);
121            // If the span is to old we aggregate it in the latest bucket instead of
122            // creating a new one
123            if bucket_timestamp < self.oldest_timestamp {
124                bucket_timestamp = self.oldest_timestamp;
125            }
126
127            let agg_key = BorrowedAggregationKey::from_span(span, self.peer_tag_keys.as_slice());
128
129            self.buckets
130                .entry(bucket_timestamp)
131                .or_insert(StatsBucket::new(bucket_timestamp))
132                .insert(
133                    agg_key,
134                    span.duration(),
135                    span.is_error(),
136                    span.has_top_level(),
137                );
138        }
139    }
140
141    /// Flush all stats bucket except for the `buffer_len` most recent. If `force` is true, flush
142    /// all buckets.
143    pub fn flush(&mut self, now: SystemTime, force: bool) -> Vec<pb::ClientStatsBucket> {
144        // TODO: Wait for HashMap::extract_if to be stabilized to avoid a full drain
145        let now_timestamp = system_time_to_unix_duration(now).as_nanos() as u64;
146        let buckets: Vec<(u64, StatsBucket)> = self.buckets.drain().collect();
147        self.oldest_timestamp = if force {
148            align_timestamp(now_timestamp, self.bucket_size)
149        } else {
150            align_timestamp(now_timestamp, self.bucket_size)
151                - (self.buffer_len as u64 - 1) * self.bucket_size
152        };
153        buckets
154            .into_iter()
155            .filter_map(|(timestamp, bucket)| {
156                // Always keep `bufferLen` buckets (default is 2: current + previous one).
157                // This is a trade-off: we accept slightly late traces (clock skew and stuff)
158                // but we delay flushing by at most `bufferLen` buckets.
159                // This delay might result in not flushing stats payload (data loss)
160                // if the tracer stops while the latest buckets aren't old enough to be flushed.
161                // The "force" boolean skips the delay and flushes all buckets, typically on
162                // shutdown.
163                if !force && timestamp > (now_timestamp - self.buffer_len as u64 * self.bucket_size)
164                {
165                    self.buckets.insert(timestamp, bucket);
166                    return None;
167                }
168                Some(bucket.flush(self.bucket_size))
169            })
170            .collect()
171    }
172}
173
174#[cfg(test)]
175mod tests;