1use super::{v04::Span, SpanText, TraceData};
7use std::collections::HashMap;
8
9const TOP_LEVEL_KEY: &str = "_top_level";
11const TRACER_TOP_LEVEL_KEY: &str = "_dd.top_level";
13const MEASURED_KEY: &str = "_dd.measured";
14const PARTIAL_VERSION_KEY: &str = "_dd.partial_version";
15
16fn set_top_level_span<T>(span: &mut Span<T>, is_top_level: bool)
17where
18 T: TraceData,
19{
20 if is_top_level {
21 span.metrics
22 .insert(T::Text::from_static_str(TOP_LEVEL_KEY), 1.0);
23 } else {
24 span.metrics.remove(TOP_LEVEL_KEY);
25 }
26}
27
28pub fn compute_top_level_span<T>(trace: &mut [Span<T>])
35where
36 T: TraceData,
37{
38 let mut span_id_idx: HashMap<u64, usize> = HashMap::new();
39 for (i, span) in trace.iter().enumerate() {
40 span_id_idx.insert(span.span_id, i);
41 }
42 for span_idx in 0..trace.len() {
43 let parent_id = trace[span_idx].parent_id;
44 if parent_id == 0 {
45 set_top_level_span(&mut trace[span_idx], true);
46 continue;
47 }
48 match span_id_idx.get(&parent_id).map(|i| &trace[*i].service) {
49 Some(parent_span_service) => {
50 if !(parent_span_service == &trace[span_idx].service) {
51 set_top_level_span(&mut trace[span_idx], true)
53 }
54 }
55 None => {
56 set_top_level_span(&mut trace[span_idx], true)
58 }
59 }
60 }
61}
62
63pub fn has_top_level<T: TraceData>(span: &Span<T>) -> bool {
65 span.metrics
66 .get(TRACER_TOP_LEVEL_KEY)
67 .is_some_and(|v| *v == 1.0)
68 || span.metrics.get(TOP_LEVEL_KEY).is_some_and(|v| *v == 1.0)
69}
70
71pub fn is_measured<T: TraceData>(span: &Span<T>) -> bool {
73 span.metrics.get(MEASURED_KEY).is_some_and(|v| *v == 1.0)
74}
75
76pub fn is_partial_snapshot<T: TraceData>(span: &Span<T>) -> bool {
82 span.metrics
83 .get(PARTIAL_VERSION_KEY)
84 .is_some_and(|v| *v >= 0.0)
85}
86
87pub struct DroppedP0Stats {
88 pub dropped_p0_traces: usize,
89 pub dropped_p0_spans: usize,
90}
91
92const SAMPLING_PRIORITY_KEY: &str = "_sampling_priority_v1";
94const SAMPLING_SINGLE_SPAN_MECHANISM: &str = "_dd.span_sampling.mechanism";
95const SAMPLING_ANALYTICS_RATE_KEY: &str = "_dd1.sr.eausr";
96
97pub fn drop_chunks<T>(traces: &mut Vec<Vec<Span<T>>>) -> DroppedP0Stats
108where
109 T: TraceData,
110{
111 let mut dropped_p0_traces = 0;
112 let mut dropped_p0_spans = 0;
113
114 traces.retain_mut(|chunk| {
115 if chunk.iter().any(|s| s.error == 1) {
117 return true;
119 }
120
121 let chunk_priority = chunk
123 .iter()
124 .find_map(|s| s.metrics.get(SAMPLING_PRIORITY_KEY));
125 if chunk_priority.is_none_or(|p| *p > 0.0) {
126 return true;
128 }
129
130 let mut sampled_indexes = Vec::new();
133 for (index, span) in chunk.iter().enumerate() {
134 if span
135 .metrics
136 .get(SAMPLING_SINGLE_SPAN_MECHANISM)
137 .is_some_and(|m| *m == 8.0)
138 || span.metrics.contains_key(SAMPLING_ANALYTICS_RATE_KEY)
139 {
140 sampled_indexes.push(index);
142 }
143 }
144 dropped_p0_spans += chunk.len() - sampled_indexes.len();
145 if sampled_indexes.is_empty() {
146 dropped_p0_traces += 1;
148 return false;
149 }
150 let sampled_spans = sampled_indexes
151 .iter()
152 .map(|i| std::mem::take(&mut chunk[*i]))
153 .collect();
154 *chunk = sampled_spans;
155 true
156 });
157
158 DroppedP0Stats {
159 dropped_p0_traces,
160 dropped_p0_spans,
161 }
162}
163
164#[cfg(test)]
165mod tests {
166 use super::*;
167 use crate::span::v04::SpanBytes;
168
169 fn create_test_span(
170 trace_id: u64,
171 span_id: u64,
172 parent_id: u64,
173 start: i64,
174 is_top_level: bool,
175 ) -> SpanBytes {
176 let mut span = SpanBytes {
177 trace_id: trace_id as u128,
178 span_id,
179 service: "test-service".into(),
180 name: "test_name".into(),
181 resource: "test-resource".into(),
182 parent_id,
183 start,
184 duration: 5,
185 error: 0,
186 meta: HashMap::from([
187 ("service".into(), "test-service".into()),
188 ("env".into(), "test-env".into()),
189 ("runtime-id".into(), "test-runtime-id-value".into()),
190 ]),
191 metrics: HashMap::new(),
192 r#type: "".into(),
193 meta_struct: HashMap::new(),
194 span_links: vec![],
195 span_events: vec![],
196 };
197 if is_top_level {
198 span.metrics.insert("_top_level".into(), 1.0);
199 span.meta
200 .insert("_dd.origin".into(), "cloudfunction".into());
201 span.meta.insert("origin".into(), "cloudfunction".into());
202 span.meta
203 .insert("functionname".into(), "dummy_function_name".into());
204 }
205 span
206 }
207
208 #[test]
209 fn test_has_top_level() {
210 let top_level_span = create_test_span(123, 1234, 12, 1, true);
211 let not_top_level_span = create_test_span(123, 1234, 12, 1, false);
212 assert!(has_top_level(&top_level_span));
213 assert!(!has_top_level(¬_top_level_span));
214 }
215
216 #[test]
217 fn test_is_measured() {
218 let mut measured_span = create_test_span(123, 1234, 12, 1, true);
219 measured_span.metrics.insert(MEASURED_KEY.into(), 1.0);
220 let not_measured_span = create_test_span(123, 1234, 12, 1, true);
221 assert!(is_measured(&measured_span));
222 assert!(!is_measured(¬_measured_span));
223 }
224
225 #[test]
226 fn test_compute_top_level() {
227 let mut span_with_different_service = create_test_span(123, 5, 2, 1, false);
228 span_with_different_service.service = "another_service".into();
229 let mut trace = vec![
230 create_test_span(123, 1, 0, 1, false),
232 create_test_span(123, 2, 1, 1, false),
234 create_test_span(123, 4, 3, 1, false),
237 span_with_different_service,
240 ];
241
242 compute_top_level_span(trace.as_mut_slice());
243
244 let spans_marked_as_top_level: Vec<u64> = trace
245 .iter()
246 .filter_map(|span| {
247 if has_top_level(span) {
248 Some(span.span_id)
249 } else {
250 None
251 }
252 })
253 .collect();
254 assert_eq!(spans_marked_as_top_level, [1, 4, 5])
255 }
256
257 #[test]
258 fn test_drop_chunks() {
259 let chunk_with_priority = vec![
260 SpanBytes {
261 span_id: 1,
262 metrics: HashMap::from([
263 (SAMPLING_PRIORITY_KEY.into(), 1.0),
264 (TRACER_TOP_LEVEL_KEY.into(), 1.0),
265 ]),
266 ..Default::default()
267 },
268 SpanBytes {
269 span_id: 2,
270 parent_id: 1,
271 ..Default::default()
272 },
273 ];
274 let chunk_with_null_priority = vec![
275 SpanBytes {
276 span_id: 1,
277 metrics: HashMap::from([
278 (SAMPLING_PRIORITY_KEY.into(), 0.0),
279 (TRACER_TOP_LEVEL_KEY.into(), 1.0),
280 ]),
281 ..Default::default()
282 },
283 SpanBytes {
284 span_id: 2,
285 parent_id: 1,
286 ..Default::default()
287 },
288 ];
289 let chunk_without_priority = vec![
290 SpanBytes {
291 span_id: 1,
292 metrics: HashMap::from([(TRACER_TOP_LEVEL_KEY.into(), 1.0)]),
293 ..Default::default()
294 },
295 SpanBytes {
296 span_id: 2,
297 parent_id: 1,
298 ..Default::default()
299 },
300 ];
301 let chunk_with_multiple_top_level = vec![
302 SpanBytes {
303 span_id: 1,
304 metrics: HashMap::from([
305 (SAMPLING_PRIORITY_KEY.into(), -1.0),
306 (TRACER_TOP_LEVEL_KEY.into(), 1.0),
307 ]),
308 ..Default::default()
309 },
310 SpanBytes {
311 span_id: 2,
312 parent_id: 1,
313 ..Default::default()
314 },
315 SpanBytes {
316 span_id: 4,
317 parent_id: 3,
318 metrics: HashMap::from([(TRACER_TOP_LEVEL_KEY.into(), 1.0)]),
319 ..Default::default()
320 },
321 ];
322 let chunk_with_error = vec![
323 SpanBytes {
324 span_id: 1,
325 error: 1,
326 metrics: HashMap::from([
327 (SAMPLING_PRIORITY_KEY.into(), 0.0),
328 (TRACER_TOP_LEVEL_KEY.into(), 1.0),
329 ]),
330 ..Default::default()
331 },
332 SpanBytes {
333 span_id: 2,
334 parent_id: 1,
335 ..Default::default()
336 },
337 ];
338 let chunk_with_a_single_span = vec![
339 SpanBytes {
340 span_id: 1,
341 metrics: HashMap::from([
342 (SAMPLING_PRIORITY_KEY.into(), 0.0),
343 (TRACER_TOP_LEVEL_KEY.into(), 1.0),
344 ]),
345 ..Default::default()
346 },
347 SpanBytes {
348 span_id: 2,
349 parent_id: 1,
350 metrics: HashMap::from([(SAMPLING_SINGLE_SPAN_MECHANISM.into(), 8.0)]),
351 ..Default::default()
352 },
353 ];
354 let chunk_with_analyzed_span = vec![
355 SpanBytes {
356 span_id: 1,
357 metrics: HashMap::from([
358 (SAMPLING_PRIORITY_KEY.into(), 0.0),
359 (TRACER_TOP_LEVEL_KEY.into(), 1.0),
360 ]),
361 ..Default::default()
362 },
363 SpanBytes {
364 span_id: 2,
365 parent_id: 1,
366 metrics: HashMap::from([(SAMPLING_ANALYTICS_RATE_KEY.into(), 1.0)]),
367 ..Default::default()
368 },
369 ];
370
371 let chunks_and_expected_sampled_spans = vec![
372 (chunk_with_priority, 2),
373 (chunk_with_null_priority, 0),
374 (chunk_without_priority, 2),
375 (chunk_with_multiple_top_level, 0),
376 (chunk_with_error, 2),
377 (chunk_with_a_single_span, 1),
378 (chunk_with_analyzed_span, 1),
379 ];
380
381 for (chunk, expected_count) in chunks_and_expected_sampled_spans.into_iter() {
382 let mut traces = vec![chunk];
383 drop_chunks(&mut traces);
384
385 if expected_count == 0 {
386 assert!(traces.is_empty());
387 } else {
388 assert_eq!(traces[0].len(), expected_count);
389 }
390 }
391 }
392}