libdd_trace_utils/span/
trace_utils.rs

1// Copyright 2021-Present Datadog, Inc. https://www.datadoghq.com/
2// SPDX-License-Identifier: Apache-2.0
3
4//! Trace-utils functionalities implementation for tinybytes based spans
5
6use super::{Span, SpanText};
7use std::collections::HashMap;
8
9/// Span metric the mini agent must set for the backend to recognize top level span
10const TOP_LEVEL_KEY: &str = "_top_level";
11/// Span metric the tracer sets to denote a top level span
12const TRACER_TOP_LEVEL_KEY: &str = "_dd.top_level";
13const MEASURED_KEY: &str = "_dd.measured";
14const PARTIAL_VERSION_KEY: &str = "_dd.partial_version";
15
16fn set_top_level_span<T>(span: &mut Span<T>, is_top_level: bool)
17where
18    T: SpanText,
19{
20    if is_top_level {
21        span.metrics.insert(T::from_static_str(TOP_LEVEL_KEY), 1.0);
22    } else {
23        span.metrics.remove(TOP_LEVEL_KEY);
24    }
25}
26
27/// Updates all the spans top-level attribute.
28/// A span is considered top-level if:
29///   - it's a root span
30///   - OR its parent is unknown (other part of the code, distributed trace)
31///   - OR its parent belongs to another service (in that case it's a "local root" being the highest
32///     ancestor of other spans belonging to this service and attached to it).
33pub fn compute_top_level_span<T>(trace: &mut [Span<T>])
34where
35    T: SpanText,
36{
37    let mut span_id_idx: HashMap<u64, usize> = HashMap::new();
38    for (i, span) in trace.iter().enumerate() {
39        span_id_idx.insert(span.span_id, i);
40    }
41    for span_idx in 0..trace.len() {
42        let parent_id = trace[span_idx].parent_id;
43        if parent_id == 0 {
44            set_top_level_span(&mut trace[span_idx], true);
45            continue;
46        }
47        match span_id_idx.get(&parent_id).map(|i| &trace[*i].service) {
48            Some(parent_span_service) => {
49                if !(parent_span_service == &trace[span_idx].service) {
50                    // parent is not in the same service
51                    set_top_level_span(&mut trace[span_idx], true)
52                }
53            }
54            None => {
55                // span has no parent in chunk
56                set_top_level_span(&mut trace[span_idx], true)
57            }
58        }
59    }
60}
61
62/// Return true if the span has a top level key set
63pub fn has_top_level<T: SpanText>(span: &Span<T>) -> bool {
64    span.metrics
65        .get(TRACER_TOP_LEVEL_KEY)
66        .is_some_and(|v| *v == 1.0)
67        || span.metrics.get(TOP_LEVEL_KEY).is_some_and(|v| *v == 1.0)
68}
69
70/// Returns true if a span should be measured (i.e., it should get trace metrics calculated).
71pub fn is_measured<T: SpanText>(span: &Span<T>) -> bool {
72    span.metrics.get(MEASURED_KEY).is_some_and(|v| *v == 1.0)
73}
74
75/// Returns true if the span is a partial snapshot.
76/// This kind of spans are partial images of long-running spans.
77/// When incomplete, a partial snapshot has a metric _dd.partial_version which is a positive
78/// integer. The metric usually increases each time a new version of the same span is sent by
79/// the tracer
80pub fn is_partial_snapshot<T: SpanText>(span: &Span<T>) -> bool {
81    span.metrics
82        .get(PARTIAL_VERSION_KEY)
83        .is_some_and(|v| *v >= 0.0)
84}
85
86pub struct DroppedP0Stats {
87    pub dropped_p0_traces: usize,
88    pub dropped_p0_spans: usize,
89}
90
91// Keys used for sampling
92const SAMPLING_PRIORITY_KEY: &str = "_sampling_priority_v1";
93const SAMPLING_SINGLE_SPAN_MECHANISM: &str = "_dd.span_sampling.mechanism";
94const SAMPLING_ANALYTICS_RATE_KEY: &str = "_dd1.sr.eausr";
95
96/// Remove spans and chunks from a TraceCollection only keeping the ones that may be sampled by
97/// the agent.
98///
99/// # Returns
100///
101/// A tuple containing the dropped p0 stats, the first value correspond the amount of traces
102/// dropped and the latter to the spans dropped.
103///
104/// # Trace-level attributes
105/// Some attributes related to the whole trace are stored in the root span of the chunk.  
106pub fn drop_chunks<T>(traces: &mut Vec<Vec<Span<T>>>) -> DroppedP0Stats
107where
108    T: SpanText,
109{
110    let mut dropped_p0_traces = 0;
111    let mut dropped_p0_spans = 0;
112
113    traces.retain_mut(|chunk| {
114        // ErrorSampler
115        if chunk.iter().any(|s| s.error == 1) {
116            // We send chunks containing an error
117            return true;
118        }
119
120        // PrioritySampler and NoPrioritySampler
121        let chunk_priority = chunk
122            .iter()
123            .find_map(|s| s.metrics.get(SAMPLING_PRIORITY_KEY));
124        if chunk_priority.is_none_or(|p| *p > 0.0) {
125            // We send chunks with positive priority or no priority
126            return true;
127        }
128
129        // SingleSpanSampler and AnalyzedSpansSampler
130        // List of spans to keep even if the chunk is dropped
131        let mut sampled_indexes = Vec::new();
132        for (index, span) in chunk.iter().enumerate() {
133            if span
134                .metrics
135                .get(SAMPLING_SINGLE_SPAN_MECHANISM)
136                .is_some_and(|m| *m == 8.0)
137                || span.metrics.contains_key(SAMPLING_ANALYTICS_RATE_KEY)
138            {
139                // We send spans sampled by single-span sampling or analyzed spans
140                sampled_indexes.push(index);
141            }
142        }
143        dropped_p0_spans += chunk.len() - sampled_indexes.len();
144        if sampled_indexes.is_empty() {
145            // If no spans were sampled we can drop the whole chunk
146            dropped_p0_traces += 1;
147            return false;
148        }
149        let sampled_spans = sampled_indexes
150            .iter()
151            .map(|i| std::mem::take(&mut chunk[*i]))
152            .collect();
153        *chunk = sampled_spans;
154        true
155    });
156
157    DroppedP0Stats {
158        dropped_p0_traces,
159        dropped_p0_spans,
160    }
161}
162
163#[cfg(test)]
164mod tests {
165    use super::*;
166    use crate::span::SpanBytes;
167
168    fn create_test_span(
169        trace_id: u64,
170        span_id: u64,
171        parent_id: u64,
172        start: i64,
173        is_top_level: bool,
174    ) -> SpanBytes {
175        let mut span = SpanBytes {
176            trace_id: trace_id as u128,
177            span_id,
178            service: "test-service".into(),
179            name: "test_name".into(),
180            resource: "test-resource".into(),
181            parent_id,
182            start,
183            duration: 5,
184            error: 0,
185            meta: HashMap::from([
186                ("service".into(), "test-service".into()),
187                ("env".into(), "test-env".into()),
188                ("runtime-id".into(), "test-runtime-id-value".into()),
189            ]),
190            metrics: HashMap::new(),
191            r#type: "".into(),
192            meta_struct: HashMap::new(),
193            span_links: vec![],
194            span_events: vec![],
195        };
196        if is_top_level {
197            span.metrics.insert("_top_level".into(), 1.0);
198            span.meta
199                .insert("_dd.origin".into(), "cloudfunction".into());
200            span.meta.insert("origin".into(), "cloudfunction".into());
201            span.meta
202                .insert("functionname".into(), "dummy_function_name".into());
203        }
204        span
205    }
206
207    #[test]
208    fn test_has_top_level() {
209        let top_level_span = create_test_span(123, 1234, 12, 1, true);
210        let not_top_level_span = create_test_span(123, 1234, 12, 1, false);
211        assert!(has_top_level(&top_level_span));
212        assert!(!has_top_level(&not_top_level_span));
213    }
214
215    #[test]
216    fn test_is_measured() {
217        let mut measured_span = create_test_span(123, 1234, 12, 1, true);
218        measured_span.metrics.insert(MEASURED_KEY.into(), 1.0);
219        let not_measured_span = create_test_span(123, 1234, 12, 1, true);
220        assert!(is_measured(&measured_span));
221        assert!(!is_measured(&not_measured_span));
222    }
223
224    #[test]
225    fn test_compute_top_level() {
226        let mut span_with_different_service = create_test_span(123, 5, 2, 1, false);
227        span_with_different_service.service = "another_service".into();
228        let mut trace = vec![
229            // Root span, should be marked as top-level
230            create_test_span(123, 1, 0, 1, false),
231            // Should not be marked as top-level
232            create_test_span(123, 2, 1, 1, false),
233            // No parent in local trace, should be marked as
234            // top-level
235            create_test_span(123, 4, 3, 1, false),
236            // Parent belongs to another service, should be marked
237            // as top-level
238            span_with_different_service,
239        ];
240
241        compute_top_level_span(trace.as_mut_slice());
242
243        let spans_marked_as_top_level: Vec<u64> = trace
244            .iter()
245            .filter_map(|span| {
246                if has_top_level(span) {
247                    Some(span.span_id)
248                } else {
249                    None
250                }
251            })
252            .collect();
253        assert_eq!(spans_marked_as_top_level, [1, 4, 5])
254    }
255
256    #[test]
257    fn test_drop_chunks() {
258        let chunk_with_priority = vec![
259            SpanBytes {
260                span_id: 1,
261                metrics: HashMap::from([
262                    (SAMPLING_PRIORITY_KEY.into(), 1.0),
263                    (TRACER_TOP_LEVEL_KEY.into(), 1.0),
264                ]),
265                ..Default::default()
266            },
267            SpanBytes {
268                span_id: 2,
269                parent_id: 1,
270                ..Default::default()
271            },
272        ];
273        let chunk_with_null_priority = vec![
274            SpanBytes {
275                span_id: 1,
276                metrics: HashMap::from([
277                    (SAMPLING_PRIORITY_KEY.into(), 0.0),
278                    (TRACER_TOP_LEVEL_KEY.into(), 1.0),
279                ]),
280                ..Default::default()
281            },
282            SpanBytes {
283                span_id: 2,
284                parent_id: 1,
285                ..Default::default()
286            },
287        ];
288        let chunk_without_priority = vec![
289            SpanBytes {
290                span_id: 1,
291                metrics: HashMap::from([(TRACER_TOP_LEVEL_KEY.into(), 1.0)]),
292                ..Default::default()
293            },
294            SpanBytes {
295                span_id: 2,
296                parent_id: 1,
297                ..Default::default()
298            },
299        ];
300        let chunk_with_multiple_top_level = vec![
301            SpanBytes {
302                span_id: 1,
303                metrics: HashMap::from([
304                    (SAMPLING_PRIORITY_KEY.into(), -1.0),
305                    (TRACER_TOP_LEVEL_KEY.into(), 1.0),
306                ]),
307                ..Default::default()
308            },
309            SpanBytes {
310                span_id: 2,
311                parent_id: 1,
312                ..Default::default()
313            },
314            SpanBytes {
315                span_id: 4,
316                parent_id: 3,
317                metrics: HashMap::from([(TRACER_TOP_LEVEL_KEY.into(), 1.0)]),
318                ..Default::default()
319            },
320        ];
321        let chunk_with_error = vec![
322            SpanBytes {
323                span_id: 1,
324                error: 1,
325                metrics: HashMap::from([
326                    (SAMPLING_PRIORITY_KEY.into(), 0.0),
327                    (TRACER_TOP_LEVEL_KEY.into(), 1.0),
328                ]),
329                ..Default::default()
330            },
331            SpanBytes {
332                span_id: 2,
333                parent_id: 1,
334                ..Default::default()
335            },
336        ];
337        let chunk_with_a_single_span = vec![
338            SpanBytes {
339                span_id: 1,
340                metrics: HashMap::from([
341                    (SAMPLING_PRIORITY_KEY.into(), 0.0),
342                    (TRACER_TOP_LEVEL_KEY.into(), 1.0),
343                ]),
344                ..Default::default()
345            },
346            SpanBytes {
347                span_id: 2,
348                parent_id: 1,
349                metrics: HashMap::from([(SAMPLING_SINGLE_SPAN_MECHANISM.into(), 8.0)]),
350                ..Default::default()
351            },
352        ];
353        let chunk_with_analyzed_span = vec![
354            SpanBytes {
355                span_id: 1,
356                metrics: HashMap::from([
357                    (SAMPLING_PRIORITY_KEY.into(), 0.0),
358                    (TRACER_TOP_LEVEL_KEY.into(), 1.0),
359                ]),
360                ..Default::default()
361            },
362            SpanBytes {
363                span_id: 2,
364                parent_id: 1,
365                metrics: HashMap::from([(SAMPLING_ANALYTICS_RATE_KEY.into(), 1.0)]),
366                ..Default::default()
367            },
368        ];
369
370        let chunks_and_expected_sampled_spans = vec![
371            (chunk_with_priority, 2),
372            (chunk_with_null_priority, 0),
373            (chunk_without_priority, 2),
374            (chunk_with_multiple_top_level, 0),
375            (chunk_with_error, 2),
376            (chunk_with_a_single_span, 1),
377            (chunk_with_analyzed_span, 1),
378        ];
379
380        for (chunk, expected_count) in chunks_and_expected_sampled_spans.into_iter() {
381            let mut traces = vec![chunk];
382            drop_chunks(&mut traces);
383
384            if expected_count == 0 {
385                assert!(traces.is_empty());
386            } else {
387                assert_eq!(traces[0].len(), expected_count);
388            }
389        }
390    }
391}