Skip to main content

libdd_trace_utils/span/
trace_utils.rs

1// Copyright 2021-Present Datadog, Inc. https://www.datadoghq.com/
2// SPDX-License-Identifier: Apache-2.0
3
4//! Trace-utils functionalities implementation for tinybytes based spans
5
6use super::{v04::Span, SpanText, TraceData};
7use std::collections::HashMap;
8
9/// Span metric the mini agent must set for the backend to recognize top level span
10const TOP_LEVEL_KEY: &str = "_top_level";
11/// Span metric the tracer sets to denote a top level span
12const TRACER_TOP_LEVEL_KEY: &str = "_dd.top_level";
13const MEASURED_KEY: &str = "_dd.measured";
14const PARTIAL_VERSION_KEY: &str = "_dd.partial_version";
15
16fn set_top_level_span<T>(span: &mut Span<T>, is_top_level: bool)
17where
18    T: TraceData,
19{
20    if is_top_level {
21        span.metrics
22            .insert(T::Text::from_static_str(TOP_LEVEL_KEY), 1.0);
23    } else {
24        span.metrics.remove(TOP_LEVEL_KEY);
25    }
26}
27
28/// Updates all the spans top-level attribute.
29/// A span is considered top-level if:
30///   - it's a root span
31///   - OR its parent is unknown (other part of the code, distributed trace)
32///   - OR its parent belongs to another service (in that case it's a "local root" being the highest
33///     ancestor of other spans belonging to this service and attached to it).
34pub fn compute_top_level_span<T>(trace: &mut [Span<T>])
35where
36    T: TraceData,
37{
38    let mut span_id_idx: HashMap<u64, usize> = HashMap::new();
39    for (i, span) in trace.iter().enumerate() {
40        span_id_idx.insert(span.span_id, i);
41    }
42    for span_idx in 0..trace.len() {
43        let parent_id = trace[span_idx].parent_id;
44        if parent_id == 0 {
45            set_top_level_span(&mut trace[span_idx], true);
46            continue;
47        }
48        match span_id_idx.get(&parent_id).map(|i| &trace[*i].service) {
49            Some(parent_span_service) => {
50                if !(parent_span_service == &trace[span_idx].service) {
51                    // parent is not in the same service
52                    set_top_level_span(&mut trace[span_idx], true)
53                }
54            }
55            None => {
56                // span has no parent in chunk
57                set_top_level_span(&mut trace[span_idx], true)
58            }
59        }
60    }
61}
62
63/// Return true if the span has a top level key set
64pub fn has_top_level<T: TraceData>(span: &Span<T>) -> bool {
65    span.metrics
66        .get(TRACER_TOP_LEVEL_KEY)
67        .is_some_and(|v| *v == 1.0)
68        || span.metrics.get(TOP_LEVEL_KEY).is_some_and(|v| *v == 1.0)
69}
70
71/// Returns true if a span should be measured (i.e., it should get trace metrics calculated).
72pub fn is_measured<T: TraceData>(span: &Span<T>) -> bool {
73    span.metrics.get(MEASURED_KEY).is_some_and(|v| *v == 1.0)
74}
75
76/// Returns true if the span is a partial snapshot.
77/// This kind of spans are partial images of long-running spans.
78/// When incomplete, a partial snapshot has a metric _dd.partial_version which is a positive
79/// integer. The metric usually increases each time a new version of the same span is sent by
80/// the tracer
81pub fn is_partial_snapshot<T: TraceData>(span: &Span<T>) -> bool {
82    span.metrics
83        .get(PARTIAL_VERSION_KEY)
84        .is_some_and(|v| *v >= 0.0)
85}
86
87pub struct DroppedP0Stats {
88    pub dropped_p0_traces: usize,
89    pub dropped_p0_spans: usize,
90}
91
92// Keys used for sampling
93const SAMPLING_PRIORITY_KEY: &str = "_sampling_priority_v1";
94const SAMPLING_SINGLE_SPAN_MECHANISM: &str = "_dd.span_sampling.mechanism";
95const SAMPLING_ANALYTICS_RATE_KEY: &str = "_dd1.sr.eausr";
96
97/// Remove spans and chunks from a TraceCollection only keeping the ones that may be sampled by
98/// the agent.
99///
100/// # Returns
101///
102/// A tuple containing the dropped p0 stats, the first value correspond the amount of traces
103/// dropped and the latter to the spans dropped.
104///
105/// # Trace-level attributes
106/// Some attributes related to the whole trace are stored in the root span of the chunk.
107pub fn drop_chunks<T>(traces: &mut Vec<Vec<Span<T>>>) -> DroppedP0Stats
108where
109    T: TraceData,
110{
111    let mut dropped_p0_traces = 0;
112    let mut dropped_p0_spans = 0;
113
114    traces.retain_mut(|chunk| {
115        // ErrorSampler
116        if chunk.iter().any(|s| s.error == 1) {
117            // We send chunks containing an error
118            return true;
119        }
120
121        // PrioritySampler and NoPrioritySampler
122        let chunk_priority = chunk
123            .iter()
124            .find_map(|s| s.metrics.get(SAMPLING_PRIORITY_KEY));
125        if chunk_priority.is_none_or(|p| *p > 0.0) {
126            // We send chunks with positive priority or no priority
127            return true;
128        }
129
130        // SingleSpanSampler and AnalyzedSpansSampler
131        // List of spans to keep even if the chunk is dropped
132        let mut sampled_indexes = Vec::new();
133        for (index, span) in chunk.iter().enumerate() {
134            if span
135                .metrics
136                .get(SAMPLING_SINGLE_SPAN_MECHANISM)
137                .is_some_and(|m| *m == 8.0)
138                || span.metrics.contains_key(SAMPLING_ANALYTICS_RATE_KEY)
139            {
140                // We send spans sampled by single-span sampling or analyzed spans
141                sampled_indexes.push(index);
142            }
143        }
144        dropped_p0_spans += chunk.len() - sampled_indexes.len();
145        if sampled_indexes.is_empty() {
146            // If no spans were sampled we can drop the whole chunk
147            dropped_p0_traces += 1;
148            return false;
149        }
150        let sampled_spans = sampled_indexes
151            .iter()
152            .map(|i| std::mem::take(&mut chunk[*i]))
153            .collect();
154        *chunk = sampled_spans;
155        true
156    });
157
158    DroppedP0Stats {
159        dropped_p0_traces,
160        dropped_p0_spans,
161    }
162}
163
164#[cfg(test)]
165mod tests {
166    use super::*;
167    use crate::span::v04::SpanBytes;
168
169    fn create_test_span(
170        trace_id: u64,
171        span_id: u64,
172        parent_id: u64,
173        start: i64,
174        is_top_level: bool,
175    ) -> SpanBytes {
176        let mut span = SpanBytes {
177            trace_id: trace_id as u128,
178            span_id,
179            service: "test-service".into(),
180            name: "test_name".into(),
181            resource: "test-resource".into(),
182            parent_id,
183            start,
184            duration: 5,
185            error: 0,
186            meta: HashMap::from([
187                ("service".into(), "test-service".into()),
188                ("env".into(), "test-env".into()),
189                ("runtime-id".into(), "test-runtime-id-value".into()),
190            ]),
191            metrics: HashMap::new(),
192            r#type: "".into(),
193            meta_struct: HashMap::new(),
194            span_links: vec![],
195            span_events: vec![],
196        };
197        if is_top_level {
198            span.metrics.insert("_top_level".into(), 1.0);
199            span.meta
200                .insert("_dd.origin".into(), "cloudfunction".into());
201            span.meta.insert("origin".into(), "cloudfunction".into());
202            span.meta
203                .insert("functionname".into(), "dummy_function_name".into());
204        }
205        span
206    }
207
208    #[test]
209    fn test_has_top_level() {
210        let top_level_span = create_test_span(123, 1234, 12, 1, true);
211        let not_top_level_span = create_test_span(123, 1234, 12, 1, false);
212        assert!(has_top_level(&top_level_span));
213        assert!(!has_top_level(&not_top_level_span));
214    }
215
216    #[test]
217    fn test_is_measured() {
218        let mut measured_span = create_test_span(123, 1234, 12, 1, true);
219        measured_span.metrics.insert(MEASURED_KEY.into(), 1.0);
220        let not_measured_span = create_test_span(123, 1234, 12, 1, true);
221        assert!(is_measured(&measured_span));
222        assert!(!is_measured(&not_measured_span));
223    }
224
225    #[test]
226    fn test_compute_top_level() {
227        let mut span_with_different_service = create_test_span(123, 5, 2, 1, false);
228        span_with_different_service.service = "another_service".into();
229        let mut trace = vec![
230            // Root span, should be marked as top-level
231            create_test_span(123, 1, 0, 1, false),
232            // Should not be marked as top-level
233            create_test_span(123, 2, 1, 1, false),
234            // No parent in local trace, should be marked as
235            // top-level
236            create_test_span(123, 4, 3, 1, false),
237            // Parent belongs to another service, should be marked
238            // as top-level
239            span_with_different_service,
240        ];
241
242        compute_top_level_span(trace.as_mut_slice());
243
244        let spans_marked_as_top_level: Vec<u64> = trace
245            .iter()
246            .filter_map(|span| {
247                if has_top_level(span) {
248                    Some(span.span_id)
249                } else {
250                    None
251                }
252            })
253            .collect();
254        assert_eq!(spans_marked_as_top_level, [1, 4, 5])
255    }
256
257    #[test]
258    fn test_drop_chunks() {
259        let chunk_with_priority = vec![
260            SpanBytes {
261                span_id: 1,
262                metrics: HashMap::from([
263                    (SAMPLING_PRIORITY_KEY.into(), 1.0),
264                    (TRACER_TOP_LEVEL_KEY.into(), 1.0),
265                ]),
266                ..Default::default()
267            },
268            SpanBytes {
269                span_id: 2,
270                parent_id: 1,
271                ..Default::default()
272            },
273        ];
274        let chunk_with_null_priority = vec![
275            SpanBytes {
276                span_id: 1,
277                metrics: HashMap::from([
278                    (SAMPLING_PRIORITY_KEY.into(), 0.0),
279                    (TRACER_TOP_LEVEL_KEY.into(), 1.0),
280                ]),
281                ..Default::default()
282            },
283            SpanBytes {
284                span_id: 2,
285                parent_id: 1,
286                ..Default::default()
287            },
288        ];
289        let chunk_without_priority = vec![
290            SpanBytes {
291                span_id: 1,
292                metrics: HashMap::from([(TRACER_TOP_LEVEL_KEY.into(), 1.0)]),
293                ..Default::default()
294            },
295            SpanBytes {
296                span_id: 2,
297                parent_id: 1,
298                ..Default::default()
299            },
300        ];
301        let chunk_with_multiple_top_level = vec![
302            SpanBytes {
303                span_id: 1,
304                metrics: HashMap::from([
305                    (SAMPLING_PRIORITY_KEY.into(), -1.0),
306                    (TRACER_TOP_LEVEL_KEY.into(), 1.0),
307                ]),
308                ..Default::default()
309            },
310            SpanBytes {
311                span_id: 2,
312                parent_id: 1,
313                ..Default::default()
314            },
315            SpanBytes {
316                span_id: 4,
317                parent_id: 3,
318                metrics: HashMap::from([(TRACER_TOP_LEVEL_KEY.into(), 1.0)]),
319                ..Default::default()
320            },
321        ];
322        let chunk_with_error = vec![
323            SpanBytes {
324                span_id: 1,
325                error: 1,
326                metrics: HashMap::from([
327                    (SAMPLING_PRIORITY_KEY.into(), 0.0),
328                    (TRACER_TOP_LEVEL_KEY.into(), 1.0),
329                ]),
330                ..Default::default()
331            },
332            SpanBytes {
333                span_id: 2,
334                parent_id: 1,
335                ..Default::default()
336            },
337        ];
338        let chunk_with_a_single_span = vec![
339            SpanBytes {
340                span_id: 1,
341                metrics: HashMap::from([
342                    (SAMPLING_PRIORITY_KEY.into(), 0.0),
343                    (TRACER_TOP_LEVEL_KEY.into(), 1.0),
344                ]),
345                ..Default::default()
346            },
347            SpanBytes {
348                span_id: 2,
349                parent_id: 1,
350                metrics: HashMap::from([(SAMPLING_SINGLE_SPAN_MECHANISM.into(), 8.0)]),
351                ..Default::default()
352            },
353        ];
354        let chunk_with_analyzed_span = vec![
355            SpanBytes {
356                span_id: 1,
357                metrics: HashMap::from([
358                    (SAMPLING_PRIORITY_KEY.into(), 0.0),
359                    (TRACER_TOP_LEVEL_KEY.into(), 1.0),
360                ]),
361                ..Default::default()
362            },
363            SpanBytes {
364                span_id: 2,
365                parent_id: 1,
366                metrics: HashMap::from([(SAMPLING_ANALYTICS_RATE_KEY.into(), 1.0)]),
367                ..Default::default()
368            },
369        ];
370
371        let chunks_and_expected_sampled_spans = vec![
372            (chunk_with_priority, 2),
373            (chunk_with_null_priority, 0),
374            (chunk_without_priority, 2),
375            (chunk_with_multiple_top_level, 0),
376            (chunk_with_error, 2),
377            (chunk_with_a_single_span, 1),
378            (chunk_with_analyzed_span, 1),
379        ];
380
381        for (chunk, expected_count) in chunks_and_expected_sampled_spans.into_iter() {
382            let mut traces = vec![chunk];
383            drop_chunks(&mut traces);
384
385            if expected_count == 0 {
386                assert!(traces.is_empty());
387            } else {
388                assert_eq!(traces[0].len(), expected_count);
389            }
390        }
391    }
392}