libdd_trace_stats/span_concentrator/mod.rs
1// Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/
2// SPDX-License-Identifier: Apache-2.0
3//! This module implements the SpanConcentrator used to aggregate spans into stats
4use std::collections::HashMap;
5use std::time::{self, Duration, SystemTime};
6
7use libdd_trace_protobuf::pb;
8
9use aggregation::{BorrowedAggregationKey, StatsBucket};
10use stat_span::StatSpan;
11
12mod aggregation;
13
14mod stat_span;
15
16/// Return a Duration between t and the unix epoch
17/// If t is before the unix epoch return 0
18fn system_time_to_unix_duration(t: SystemTime) -> Duration {
19 t.duration_since(time::UNIX_EPOCH)
20 .unwrap_or(Duration::from_nanos(0))
21}
22
23/// Align a timestamp on the start of a bucket
24#[inline]
25fn align_timestamp(t: u64, bucket_size: u64) -> u64 {
26 t - (t % bucket_size)
27}
28
29/// Return true if the span is eligible for stats computation
30fn is_span_eligible<'a, T>(span: &'a T, span_kinds_stats_computed: &[String]) -> bool
31where
32 T: StatSpan<'a>,
33{
34 (span.has_top_level() || span.is_measured() || {
35 span.get_meta("span.kind")
36 .is_some_and(|span_kind| span_kinds_stats_computed.contains(&span_kind.to_lowercase()))
37 }) && !span.is_partial_snapshot()
38}
39
40/// SpanConcentrator compute stats on span aggregated by time and span attributes
41///
42/// # Aggregation
43/// Spans are aggregated into time buckets based on their end_time. Within each time bucket there
44/// is another level of aggregation based on the spans fields (e.g. resource_name, service_name)
45/// and the peer tags if the `peer_tags_aggregation` is enabled.
46///
47/// # Span eligibility
48/// The ingested spans are only aggregated if they are root, top-level, measured or if their
49/// `span.kind` is eligible and the `compute_stats_by_span_kind` is enabled.
50///
51/// # Flushing
52/// When the SpanConcentrator is flushed it keeps the `buffer_len` most recent buckets and remove
53/// all older buckets returning their content. When using force flush all buckets are flushed
54/// regardless of their age.
55#[derive(Debug, Clone)]
56pub struct SpanConcentrator {
57 /// Size of the time buckets used for aggregation in nanos
58 bucket_size: u64,
59 buckets: HashMap<u64, StatsBucket>,
60 /// Timestamp of the oldest time bucket for which we allow data.
61 /// Any ingested stats older than it get added to this bucket.
62 oldest_timestamp: u64,
63 /// bufferLen is the number stats bucket we keep when flushing.
64 buffer_len: usize,
65 /// span.kind fields eligible for stats computation
66 span_kinds_stats_computed: Vec<String>,
67 /// keys for supplementary tags that describe peer.service entities
68 peer_tag_keys: Vec<String>,
69}
70
71impl SpanConcentrator {
72 /// Return a new concentrator with the given parameters
73 /// - `bucket_size` is the size of the time buckets
74 /// - `now` the current system time, used to define the oldest bucket
75 /// - `span_kinds_stats_computed` list of span kinds eligible for stats computation
76 /// - `peer_tags_keys` list of keys considered as peer tags for aggregation
77 pub fn new(
78 bucket_size: Duration,
79 now: SystemTime,
80 span_kinds_stats_computed: Vec<String>,
81 peer_tag_keys: Vec<String>,
82 ) -> SpanConcentrator {
83 SpanConcentrator {
84 bucket_size: bucket_size.as_nanos() as u64,
85 buckets: HashMap::new(),
86 oldest_timestamp: align_timestamp(
87 system_time_to_unix_duration(now).as_nanos() as u64,
88 bucket_size.as_nanos() as u64,
89 ),
90 buffer_len: 2,
91 span_kinds_stats_computed,
92 peer_tag_keys,
93 }
94 }
95
96 /// Set the list of span kinds eligible for stats computation
97 pub fn set_span_kinds(&mut self, span_kinds: Vec<String>) {
98 self.span_kinds_stats_computed = span_kinds;
99 }
100
101 /// Set the list of keys considered as peer_tags for aggregation
102 pub fn set_peer_tags(&mut self, peer_tags: Vec<String>) {
103 self.peer_tag_keys = peer_tags;
104 }
105
106 /// Return the bucket size used for aggregation
107 pub fn get_bucket_size(&self) -> Duration {
108 Duration::from_nanos(self.bucket_size)
109 }
110
111 /// Add a span into the concentrator, by computing stats if the span is eligible for stats
112 /// computation.
113 pub fn add_span<'a, T>(&'a mut self, span: &'a T)
114 where
115 T: StatSpan<'a>,
116 {
117 // If the span is eligible for stats computation
118 if is_span_eligible(span, self.span_kinds_stats_computed.as_slice()) {
119 let mut bucket_timestamp =
120 align_timestamp((span.start() + span.duration()) as u64, self.bucket_size);
121 // If the span is to old we aggregate it in the latest bucket instead of
122 // creating a new one
123 if bucket_timestamp < self.oldest_timestamp {
124 bucket_timestamp = self.oldest_timestamp;
125 }
126
127 let agg_key = BorrowedAggregationKey::from_span(span, self.peer_tag_keys.as_slice());
128
129 self.buckets
130 .entry(bucket_timestamp)
131 .or_insert(StatsBucket::new(bucket_timestamp))
132 .insert(
133 agg_key,
134 span.duration(),
135 span.is_error(),
136 span.has_top_level(),
137 );
138 }
139 }
140
141 /// Flush all stats bucket except for the `buffer_len` most recent. If `force` is true, flush
142 /// all buckets.
143 pub fn flush(&mut self, now: SystemTime, force: bool) -> Vec<pb::ClientStatsBucket> {
144 // TODO: Wait for HashMap::extract_if to be stabilized to avoid a full drain
145 let now_timestamp = system_time_to_unix_duration(now).as_nanos() as u64;
146 let buckets: Vec<(u64, StatsBucket)> = self.buckets.drain().collect();
147 self.oldest_timestamp = if force {
148 align_timestamp(now_timestamp, self.bucket_size)
149 } else {
150 align_timestamp(now_timestamp, self.bucket_size)
151 - (self.buffer_len as u64 - 1) * self.bucket_size
152 };
153 buckets
154 .into_iter()
155 .filter_map(|(timestamp, bucket)| {
156 // Always keep `bufferLen` buckets (default is 2: current + previous one).
157 // This is a trade-off: we accept slightly late traces (clock skew and stuff)
158 // but we delay flushing by at most `bufferLen` buckets.
159 // This delay might result in not flushing stats payload (data loss)
160 // if the tracer stops while the latest buckets aren't old enough to be flushed.
161 // The "force" boolean skips the delay and flushes all buckets, typically on
162 // shutdown.
163 if !force && timestamp > (now_timestamp - self.buffer_len as u64 * self.bucket_size)
164 {
165 self.buckets.insert(timestamp, bucket);
166 return None;
167 }
168 Some(bucket.flush(self.bucket_size))
169 })
170 .collect()
171 }
172}
173
174#[cfg(test)]
175mod tests;