libdd_trace_utils/
trace_utils.rs

1// Copyright 2023-Present Datadog, Inc. https://www.datadoghq.com/
2// SPDX-License-Identifier: Apache-2.0
3
4pub use crate::send_data::send_data_result::SendDataResult;
5pub use crate::send_data::SendData;
6use crate::span::v05::dict::SharedDict;
7use crate::span::{v05, SpanText};
8pub use crate::tracer_header_tags::TracerHeaderTags;
9use crate::tracer_payload::TracerPayloadCollection;
10use crate::tracer_payload::{self, TraceChunks};
11use anyhow::anyhow;
12use bytes::buf::Reader;
13use http_body_util::BodyExt;
14use hyper::body::Buf;
15use libdd_common::{azure_app_services, hyper_migration};
16use libdd_trace_normalization::normalizer;
17use libdd_trace_protobuf::pb;
18use rmp::decode::read_array_len;
19use rmpv::decode::read_value;
20use rmpv::{Integer, Value};
21use std::cmp::Ordering;
22use std::collections::{HashMap, HashSet};
23use std::env;
24use tracing::{debug, error};
25
26/// The maximum payload size for a single request that can be sent to the trace agent. Payloads
27/// larger than this size will be dropped and the agent will return a 413 error if
28/// `datadog-send-real-http-status` is set.
29pub const MAX_PAYLOAD_SIZE: usize = 25 * 1024 * 1024;
30/// Span metric the mini agent must set for the backend to recognize top level span
31const TOP_LEVEL_KEY: &str = "_top_level";
32/// Span metric the tracer sets to denote a top level span
33const TRACER_TOP_LEVEL_KEY: &str = "_dd.top_level";
34const MEASURED_KEY: &str = "_dd.measured";
35const PARTIAL_VERSION_KEY: &str = "_dd.partial_version";
36const MAX_STRING_DICT_SIZE: u32 = 25_000_000;
37const SPAN_ELEMENT_COUNT: usize = 12;
38
39/// First value of returned tuple is the payload size
40pub async fn get_traces_from_request_body(
41    body: hyper_migration::Body,
42) -> anyhow::Result<(usize, Vec<Vec<pb::Span>>)> {
43    let buffer = body.collect().await?.aggregate();
44    let size = buffer.remaining();
45
46    let traces: Vec<Vec<pb::Span>> = match rmp_serde::from_read(buffer.reader()) {
47        Ok(res) => res,
48        Err(err) => {
49            anyhow::bail!("Error deserializing trace from request body: {err}")
50        }
51    };
52
53    Ok((size, traces))
54}
55
56#[inline]
57fn get_v05_strings_dict(reader: &mut Reader<impl Buf>) -> anyhow::Result<Vec<String>> {
58    let dict_size =
59        read_array_len(reader).map_err(|err| anyhow!("Error reading dict size: {err}"))?;
60    if dict_size > MAX_STRING_DICT_SIZE {
61        anyhow::bail!(
62            "Error deserializing strings dictionary. Dict size is too large: {dict_size}"
63        );
64    }
65    let mut dict: Vec<String> = Vec::with_capacity(dict_size.try_into()?);
66    for _ in 0..dict_size {
67        match read_value(reader)? {
68            Value::String(s) => {
69                let parsed_string = s.into_str().ok_or_else(|| anyhow!("Error reading string dict"))?;
70                dict.push(parsed_string);
71            }
72            val => anyhow::bail!("Error deserializing strings dictionary. Value in string dict is not a string: {val}")
73        }
74    }
75    Ok(dict)
76}
77
78#[inline]
79fn get_v05_span(reader: &mut Reader<impl Buf>, dict: &[String]) -> anyhow::Result<pb::Span> {
80    let mut span: pb::Span = Default::default();
81    let span_size = rmp::decode::read_array_len(reader)
82        .map_err(|err| anyhow!("Error reading span size: {err}"))? as usize;
83    if span_size != SPAN_ELEMENT_COUNT {
84        anyhow::bail!("Expected an array of exactly 12 elements in a span, got {span_size}");
85    }
86    // 0 - service
87    span.service = get_v05_string(reader, dict, "service")?;
88    // 1 - name
89    span.name = get_v05_string(reader, dict, "name")?;
90    // 2 - resource
91    span.resource = get_v05_string(reader, dict, "resource")?;
92
93    // 3 - trace_id
94    match read_value(reader)? {
95        Value::Integer(i) => {
96            span.trace_id = i.as_u64().ok_or_else(|| {
97                anyhow!("Error reading span trace_id, value is not an integer: {i}")
98            })?;
99        }
100        val => anyhow::bail!("Error reading span trace_id, value is not an integer: {val}"),
101    };
102    // 4 - span_id
103    match read_value(reader)? {
104        Value::Integer(i) => {
105            span.span_id = i.as_u64().ok_or_else(|| {
106                anyhow!("Error reading span span_id, value is not an integer: {i}")
107            })?;
108        }
109        val => anyhow::bail!("Error reading span span_id, value is not an integer: {val}"),
110    };
111    // 5 - parent_id
112    match read_value(reader)? {
113        Value::Integer(i) => {
114            span.parent_id = i.as_u64().ok_or_else(|| {
115                anyhow!("Error reading span parent_id, value is not an integer: {i}")
116            })?;
117        }
118        val => anyhow::bail!("Error reading span parent_id, value is not an integer: {val}"),
119    };
120    // 6 - start
121    match read_value(reader)? {
122        Value::Integer(i) => {
123            span.start = i
124                .as_i64()
125                .ok_or_else(|| anyhow!("Error reading span start, value is not an integer: {i}"))?;
126        }
127        val => anyhow::bail!("Error reading span start, value is not an integer: {val}"),
128    };
129    // 7 - duration
130    match read_value(reader)? {
131        Value::Integer(i) => {
132            span.duration = i.as_i64().ok_or_else(|| {
133                anyhow!("Error reading span duration, value is not an integer: {i}")
134            })?;
135        }
136        val => anyhow::bail!("Error reading span duration, value is not an integer: {val}"),
137    };
138    // 8 - error
139    match read_value(reader)? {
140        Value::Integer(i) => {
141            span.error = i
142                .as_i64()
143                .ok_or_else(|| anyhow!("Error reading span error, value is not an integer: {i}"))?
144                as i32;
145        }
146        val => anyhow::bail!("Error reading span error, value is not an integer: {val}"),
147    }
148    // 9 - meta
149    match read_value(reader)? {
150        Value::Map(meta) => {
151            for (k, v) in meta.iter() {
152                match k {
153                    Value::Integer(k) => {
154                        match v {
155                            Value::Integer(v) => {
156                                let key = str_from_dict(dict, *k)?;
157                                let val = str_from_dict(dict, *v)?;
158                                span.meta.insert(key, val);
159                            }
160                            _ => anyhow::bail!("Error reading span meta, value is not an integer and can't be looked up in dict: {v}")
161                        }
162                    }
163                    _ => anyhow::bail!("Error reading span meta, key is not an integer and can't be looked up in dict: {k}")
164                }
165            }
166        }
167        val => anyhow::bail!("Error reading span meta, value is not a map: {val}"),
168    }
169    // 10 - metrics
170    match read_value(reader)? {
171        Value::Map(metrics) => {
172            for (k, v) in metrics.iter() {
173                match k {
174                    Value::Integer(k) => {
175                        match v {
176                            Value::Integer(v) => {
177                                let key = str_from_dict(dict, *k)?;
178                                span.metrics.insert(key, v.as_f64().ok_or_else(||anyhow!("Error reading span metrics, value is not an integer: {v}"))?);
179                            }
180                            Value::F64(v) => {
181                                let key = str_from_dict(dict, *k)?;
182                                span.metrics.insert(key, *v);
183                            }
184                            _ => anyhow::bail!(
185                                "Error reading span metrics, value is not a float or integer: {v}"
186                            ),
187                        }
188                    }
189                    _ => anyhow::bail!("Error reading span metrics, key is not an integer: {k}"),
190                }
191            }
192        }
193        val => anyhow::bail!("Error reading span metrics, value is not a map: {val}"),
194    }
195
196    // 11 - type
197    match read_value(reader)? {
198        Value::Integer(s) => span.r#type = str_from_dict(dict, s)?,
199        val => anyhow::bail!("Error reading span type, value is not an integer: {val}"),
200    }
201    Ok(span)
202}
203
204#[inline]
205fn str_from_dict(dict: &[String], id: Integer) -> anyhow::Result<String> {
206    let id = id
207        .as_i64()
208        .ok_or_else(|| anyhow!("Error reading string from dict, id is not an integer: {id}"))?
209        as usize;
210    if id >= dict.len() {
211        anyhow::bail!("Error reading string from dict, id out of bounds: {id}");
212    }
213    Ok(dict[id].to_string())
214}
215
216#[inline]
217fn get_v05_string(
218    reader: &mut Reader<impl Buf>,
219    dict: &[String],
220    field_name: &str,
221) -> anyhow::Result<String> {
222    match read_value(reader)? {
223        Value::Integer(s) => {
224            str_from_dict(dict, s)
225        },
226        val => anyhow::bail!("Error reading {field_name}, value is not an integer and can't be looked up in dict: {val}")
227    }
228}
229
230pub async fn get_v05_traces_from_request_body(
231    body: hyper_migration::Body,
232) -> anyhow::Result<(usize, Vec<Vec<pb::Span>>)> {
233    let buffer = body.collect().await?.aggregate();
234    let body_size = buffer.remaining();
235    let mut reader = buffer.reader();
236    let wrapper_size = read_array_len(&mut reader)?;
237    if wrapper_size != 2 {
238        anyhow::bail!("Expected an arrary of exactly 2 elements, got {wrapper_size}");
239    }
240
241    let dict = get_v05_strings_dict(&mut reader)?;
242
243    let traces_size = rmp::decode::read_array_len(&mut reader)?;
244    let mut traces: Vec<Vec<pb::Span>> = Default::default();
245
246    for _ in 0..traces_size {
247        let spans_size = rmp::decode::read_array_len(&mut reader)?;
248        let mut trace: Vec<pb::Span> = Default::default();
249
250        for _ in 0..spans_size {
251            let span = get_v05_span(&mut reader, &dict)?;
252            trace.push(span);
253        }
254        traces.push(trace);
255    }
256    Ok((body_size, traces))
257}
258
259/// Tags gathered from a trace's root span
260#[derive(Default)]
261pub struct RootSpanTags<'a> {
262    pub env: &'a str,
263    pub app_version: &'a str,
264    pub hostname: &'a str,
265    pub runtime_id: &'a str,
266}
267
268pub(crate) fn construct_trace_chunk(trace: Vec<pb::Span>) -> pb::TraceChunk {
269    pb::TraceChunk {
270        priority: normalizer::SamplerPriority::None as i32,
271        origin: "".to_string(),
272        spans: trace,
273        tags: HashMap::new(),
274        dropped_trace: false,
275    }
276}
277
278pub(crate) fn construct_tracer_payload(
279    chunks: Vec<pb::TraceChunk>,
280    tracer_tags: &TracerHeaderTags,
281    root_span_tags: RootSpanTags,
282) -> pb::TracerPayload {
283    pb::TracerPayload {
284        app_version: root_span_tags.app_version.to_string(),
285        language_name: tracer_tags.lang.to_string(),
286        container_id: tracer_tags.container_id.to_string(),
287        env: root_span_tags.env.to_string(),
288        runtime_id: root_span_tags.runtime_id.to_string(),
289        chunks,
290        hostname: root_span_tags.hostname.to_string(),
291        language_version: tracer_tags.lang_version.to_string(),
292        tags: HashMap::new(),
293        tracer_version: tracer_tags.tracer_version.to_string(),
294    }
295}
296
297pub(crate) fn cmp_send_data_payloads(a: &pb::TracerPayload, b: &pb::TracerPayload) -> Ordering {
298    a.tracer_version
299        .cmp(&b.tracer_version)
300        .then(a.language_version.cmp(&b.language_version))
301        .then(a.language_name.cmp(&b.language_name))
302        .then(a.hostname.cmp(&b.hostname))
303        .then(a.container_id.cmp(&b.container_id))
304        .then(a.runtime_id.cmp(&b.runtime_id))
305        .then(a.env.cmp(&b.env))
306        .then(a.app_version.cmp(&b.app_version))
307}
308
309pub fn coalesce_send_data(mut data: Vec<SendData>) -> Vec<SendData> {
310    // TODO trace payloads with identical data except for chunk could be merged?
311
312    data.sort_unstable_by(|a, b| {
313        a.get_target()
314            .url
315            .to_string()
316            .cmp(&b.get_target().url.to_string())
317            .then(a.get_target().test_token.cmp(&b.get_target().test_token))
318    });
319    data.dedup_by(|a, b| {
320        if a.get_target().url == b.get_target().url
321            && a.get_target().test_token == b.get_target().test_token
322        {
323            // Size is only an approximation. In practice it won't vary much, but be safe here.
324            // We also don't care about the exact maximum size, like two 25 MB or one 50 MB request
325            // has similar results. The primary goal here is avoiding many small requests.
326            // TODO: maybe make the MAX_PAYLOAD_SIZE configurable?
327            if a.size + b.size < MAX_PAYLOAD_SIZE / 2 {
328                // Note: dedup_by drops a, and retains b.
329                b.tracer_payloads.append(&mut a.tracer_payloads);
330                b.size += a.size;
331                return true;
332            }
333        }
334        false
335    });
336    // Merge chunks with common properties. Reduces requests for agentful mode.
337    // And reduces a little bit of data for agentless.
338    for send_data in data.iter_mut() {
339        send_data.tracer_payloads.merge();
340    }
341    data
342}
343
344pub fn get_root_span_index(trace: &[pb::Span]) -> anyhow::Result<usize> {
345    if trace.is_empty() {
346        anyhow::bail!("Cannot find root span index in an empty trace.");
347    }
348
349    // Do a first pass to find if we have an obvious root span (starting from the end) since some
350    // clients put the root span last.
351    for (i, span) in trace.iter().enumerate().rev() {
352        if span.parent_id == 0 {
353            return Ok(i);
354        }
355    }
356
357    let mut span_ids: HashSet<u64> = HashSet::with_capacity(trace.len());
358    for span in trace.iter() {
359        span_ids.insert(span.span_id);
360    }
361
362    let mut root_span_id = None;
363    for (i, span) in trace.iter().enumerate() {
364        // If a span's parent is not in the trace, it is a root
365        if !span_ids.contains(&span.parent_id) {
366            if root_span_id.is_some() {
367                debug!(
368                    trace_id = &trace[0].trace_id,
369                    "trace has multiple root spans"
370                );
371            }
372            root_span_id = Some(i);
373        }
374    }
375    Ok(match root_span_id {
376        Some(i) => i,
377        None => {
378            debug!(
379                trace_id = &trace[0].trace_id,
380                "Could not find the root span for trace"
381            );
382            trace.len() - 1
383        }
384    })
385}
386
387/// Updates all the spans top-level attribute.
388/// A span is considered top-level if:
389///   - it's a root span
390///   - OR its parent is unknown (other part of the code, distributed trace)
391///   - OR its parent belongs to another service (in that case it's a "local root" being the highest
392///     ancestor of other spans belonging to this service and attached to it).
393pub fn compute_top_level_span(trace: &mut [pb::Span]) {
394    let mut span_id_to_service: HashMap<u64, String> = HashMap::new();
395    for span in trace.iter() {
396        span_id_to_service.insert(span.span_id, span.service.clone());
397    }
398    for span in trace.iter_mut() {
399        if span.parent_id == 0 {
400            set_top_level_span(span, true);
401            continue;
402        }
403        match span_id_to_service.get(&span.parent_id) {
404            Some(parent_span_service) => {
405                if !parent_span_service.eq(&span.service) {
406                    // parent is not in the same service
407                    set_top_level_span(span, true)
408                }
409            }
410            None => {
411                // span has no parent in chunk
412                set_top_level_span(span, true)
413            }
414        }
415    }
416}
417
418/// Return true if the span has a top level key set
419pub fn has_top_level(span: &pb::Span) -> bool {
420    span.metrics
421        .get(TRACER_TOP_LEVEL_KEY)
422        .is_some_and(|v| *v == 1.0)
423        || span.metrics.get(TOP_LEVEL_KEY).is_some_and(|v| *v == 1.0)
424}
425
426fn set_top_level_span(span: &mut pb::Span, is_top_level: bool) {
427    if is_top_level {
428        span.metrics.insert(TOP_LEVEL_KEY.to_string(), 1.0);
429    } else {
430        span.metrics.remove(TOP_LEVEL_KEY);
431    }
432}
433
434pub fn set_serverless_root_span_tags(
435    span: &mut pb::Span,
436    app_name: Option<String>,
437    env_type: &EnvironmentType,
438) {
439    let origin_tag = match env_type {
440        EnvironmentType::CloudFunction => "cloudfunction",
441        EnvironmentType::AzureFunction => "azurefunction",
442        EnvironmentType::AzureSpringApp => "azurespringapp",
443        EnvironmentType::LambdaFunction => "lambda", // historical reasons
444    };
445    span.meta
446        .insert("_dd.origin".to_string(), origin_tag.to_string());
447    span.meta
448        .insert("origin".to_string(), origin_tag.to_string());
449
450    if let Some(function_name) = app_name {
451        match env_type {
452            EnvironmentType::CloudFunction
453            | EnvironmentType::AzureFunction
454            | EnvironmentType::LambdaFunction => {
455                span.meta.insert("functionname".to_string(), function_name);
456            }
457            _ => {}
458        }
459    }
460}
461
462fn update_tracer_top_level(span: &mut pb::Span) {
463    if span.metrics.contains_key(TRACER_TOP_LEVEL_KEY) {
464        span.metrics.insert(TOP_LEVEL_KEY.to_string(), 1.0);
465    }
466}
467
468#[derive(Clone, Debug, Eq, PartialEq)]
469pub enum EnvironmentType {
470    CloudFunction,
471    AzureFunction,
472    AzureSpringApp,
473    LambdaFunction,
474}
475
476#[derive(Clone, Debug, Eq, PartialEq)]
477pub struct MiniAgentMetadata {
478    pub azure_spring_app_hostname: Option<String>,
479    pub azure_spring_app_name: Option<String>,
480    pub gcp_project_id: Option<String>,
481    pub gcp_region: Option<String>,
482    pub version: Option<String>,
483}
484
485impl Default for MiniAgentMetadata {
486    fn default() -> Self {
487        MiniAgentMetadata {
488            azure_spring_app_hostname: Default::default(),
489            azure_spring_app_name: Default::default(),
490            gcp_project_id: Default::default(),
491            gcp_region: Default::default(),
492            version: env::var("DD_SERVERLESS_COMPAT_VERSION").ok(),
493        }
494    }
495}
496
497pub fn enrich_span_with_mini_agent_metadata(
498    span: &mut pb::Span,
499    mini_agent_metadata: &MiniAgentMetadata,
500) {
501    if let Some(azure_spring_app_hostname) = &mini_agent_metadata.azure_spring_app_hostname {
502        span.meta.insert(
503            "asa.hostname".to_string(),
504            azure_spring_app_hostname.to_string(),
505        );
506    }
507    if let Some(azure_spring_app_name) = &mini_agent_metadata.azure_spring_app_name {
508        span.meta
509            .insert("asa.name".to_string(), azure_spring_app_name.to_string());
510    }
511    if let Some(serverless_compat_version) = &mini_agent_metadata.version {
512        span.meta.insert(
513            "_dd.serverless_compat_version".to_string(),
514            serverless_compat_version.to_string(),
515        );
516    }
517}
518
519pub fn enrich_span_with_google_cloud_function_metadata(
520    span: &mut pb::Span,
521    mini_agent_metadata: &MiniAgentMetadata,
522    function: Option<String>,
523) {
524    #[allow(clippy::todo)]
525    let Some(region) = &mini_agent_metadata.gcp_region
526    else {
527        todo!()
528    };
529    #[allow(clippy::todo)]
530    let Some(project) = &mini_agent_metadata.gcp_project_id
531    else {
532        todo!()
533    };
534
535    #[allow(clippy::unwrap_used)]
536    if function.is_some() && !region.is_empty() && !project.is_empty() {
537        let resource_name = format!(
538            "projects/{}/locations/{}/functions/{}",
539            project,
540            region,
541            function.unwrap()
542        );
543
544        span.meta
545            .insert("gcrfx.location".to_string(), region.to_string());
546        span.meta
547            .insert("gcrfx.project_id".to_string(), project.to_string());
548        span.meta
549            .insert("gcrfx.resource_name".to_string(), resource_name.to_string());
550    }
551}
552
553pub fn enrich_span_with_azure_function_metadata(span: &mut pb::Span) {
554    if let Some(aas_metadata) = &*azure_app_services::AAS_METADATA_FUNCTION {
555        let aas_tags = [
556            ("aas.resource.id", aas_metadata.get_resource_id()),
557            (
558                "aas.environment.instance_id",
559                aas_metadata.get_instance_id(),
560            ),
561            (
562                "aas.environment.instance_name",
563                aas_metadata.get_instance_name(),
564            ),
565            ("aas.subscription.id", aas_metadata.get_subscription_id()),
566            ("aas.environment.os", aas_metadata.get_operating_system()),
567            ("aas.environment.runtime", aas_metadata.get_runtime()),
568            (
569                "aas.environment.runtime_version",
570                aas_metadata.get_runtime_version(),
571            ),
572            (
573                "aas.environment.function_runtime",
574                aas_metadata.get_function_runtime_version(),
575            ),
576            ("aas.resource.group", aas_metadata.get_resource_group()),
577            ("aas.site.name", aas_metadata.get_site_name()),
578            ("aas.site.kind", aas_metadata.get_site_kind()),
579            ("aas.site.type", aas_metadata.get_site_type()),
580        ];
581        aas_tags.into_iter().for_each(|(name, value)| {
582            span.meta.insert(name.to_string(), value.to_string());
583        });
584    }
585}
586
587/// Used to populate root_span_tags fields if they exist in the root span's meta tags
588macro_rules! parse_root_span_tags {
589    (
590        $root_span_meta_map:ident,
591        { $($tag:literal => $($root_span_tags_struct_field:ident).+ ,)+ }
592    ) => {
593        $(
594            if let Some(root_span_tag_value) = $root_span_meta_map.get($tag) {
595                $($root_span_tags_struct_field).+ = root_span_tag_value;
596            }
597        )+
598    }
599}
600
601pub fn collect_trace_chunks<T: SpanText>(
602    traces: Vec<Vec<crate::span::Span<T>>>,
603    use_v05_format: bool,
604) -> anyhow::Result<TraceChunks<T>> {
605    if use_v05_format {
606        let mut shared_dict = SharedDict::default();
607        let mut v05_traces: Vec<Vec<v05::Span>> = Vec::with_capacity(traces.len());
608        for trace in traces {
609            let trace_len = trace.len();
610            let v05_trace = trace.into_iter().try_fold(
611                Vec::with_capacity(trace_len),
612                |mut acc, span| -> anyhow::Result<Vec<v05::Span>> {
613                    acc.push(v05::from_span(span, &mut shared_dict)?);
614                    Ok(acc)
615                },
616            )?;
617
618            v05_traces.push(v05_trace);
619        }
620        Ok(TraceChunks::V05((shared_dict, v05_traces)))
621    } else {
622        Ok(TraceChunks::V04(traces))
623    }
624}
625
626pub fn collect_pb_trace_chunks<T: tracer_payload::TraceChunkProcessor>(
627    mut traces: Vec<Vec<pb::Span>>,
628    tracer_header_tags: &TracerHeaderTags,
629    process_chunk: &mut T,
630    is_agentless: bool,
631) -> anyhow::Result<TracerPayloadCollection> {
632    let mut trace_chunks: Vec<pb::TraceChunk> = Vec::new();
633
634    // We'll skip setting the global metadata and rely on the agent to unpack these
635    let mut gathered_root_span_tags = !is_agentless;
636    let mut root_span_tags = RootSpanTags::default();
637
638    for trace in traces.iter_mut() {
639        if is_agentless {
640            if let Err(e) = normalizer::normalize_trace(trace) {
641                error!("Error normalizing trace: {e}");
642            }
643        }
644
645        let mut chunk = construct_trace_chunk(trace.to_vec());
646
647        let root_span_index = match get_root_span_index(trace) {
648            Ok(res) => res,
649            Err(e) => {
650                error!("Error getting the root span index of a trace, skipping. {e}");
651                continue;
652            }
653        };
654
655        if let Err(e) = normalizer::normalize_chunk(&mut chunk, root_span_index) {
656            error!("Error normalizing trace chunk: {e}");
657        }
658
659        for span in chunk.spans.iter_mut() {
660            // TODO: obfuscate & truncate spans
661            if tracer_header_tags.client_computed_top_level {
662                update_tracer_top_level(span);
663            }
664        }
665
666        if !tracer_header_tags.client_computed_top_level {
667            compute_top_level_span(&mut chunk.spans);
668        }
669
670        process_chunk.process(&mut chunk, root_span_index);
671
672        trace_chunks.push(chunk);
673
674        if !gathered_root_span_tags {
675            gathered_root_span_tags = true;
676            let meta_map = &trace[root_span_index].meta;
677            parse_root_span_tags!(
678                meta_map,
679                {
680                    "env" => root_span_tags.env,
681                    "version" => root_span_tags.app_version,
682                    "_dd.hostname" => root_span_tags.hostname,
683                    "runtime-id" => root_span_tags.runtime_id,
684                }
685            );
686        }
687    }
688
689    Ok(TracerPayloadCollection::V07(vec![
690        construct_tracer_payload(trace_chunks, tracer_header_tags, root_span_tags),
691    ]))
692}
693
694/// Returns true if a span should be measured (i.e., it should get trace metrics calculated).
695pub fn is_measured(span: &pb::Span) -> bool {
696    span.metrics.get(MEASURED_KEY).is_some_and(|v| *v == 1.0)
697}
698
699/// Returns true if the span is a partial snapshot.
700/// This kind of spans are partial images of long-running spans.
701/// When incomplete, a partial snapshot has a metric _dd.partial_version which is a positive
702/// integer. The metric usually increases each time a new version of the same span is sent by the
703/// tracer
704pub fn is_partial_snapshot(span: &pb::Span) -> bool {
705    span.metrics
706        .get(PARTIAL_VERSION_KEY)
707        .is_some_and(|v| *v >= 0.0)
708}
709
710#[cfg(test)]
711mod tests {
712    use super::*;
713    use crate::{
714        span::SharedDictBytes,
715        test_utils::{create_test_no_alloc_span, create_test_span},
716    };
717    use hyper::Request;
718    use libdd_common::Endpoint;
719    use serde_json::json;
720
721    fn find_index_in_dict(dict: &SharedDictBytes, value: &str) -> Option<u32> {
722        let idx = dict.iter().position(|e| e.as_str() == value);
723        idx.map(|idx| idx.try_into().unwrap())
724    }
725
726    #[test]
727    fn test_coalescing_does_not_exceed_max_size() {
728        let dummy = SendData::new(
729            MAX_PAYLOAD_SIZE / 5 + 1,
730            TracerPayloadCollection::V07(vec![pb::TracerPayload {
731                container_id: "".to_string(),
732                language_name: "".to_string(),
733                language_version: "".to_string(),
734                tracer_version: "".to_string(),
735                runtime_id: "".to_string(),
736                chunks: vec![pb::TraceChunk {
737                    priority: 0,
738                    origin: "".to_string(),
739                    spans: vec![],
740                    tags: Default::default(),
741                    dropped_trace: false,
742                }],
743                tags: Default::default(),
744                env: "".to_string(),
745                hostname: "".to_string(),
746                app_version: "".to_string(),
747            }]),
748            TracerHeaderTags::default(),
749            &Endpoint::default(),
750        );
751        let coalesced = coalesce_send_data(vec![
752            dummy.clone(),
753            dummy.clone(),
754            dummy.clone(),
755            dummy.clone(),
756            dummy.clone(),
757        ]);
758        assert_eq!(
759            5,
760            coalesced
761                .iter()
762                .map(|s| s.tracer_payloads.size())
763                .sum::<usize>()
764        );
765        // assert some chunks are actually coalesced
766        assert!(
767            coalesced
768                .iter()
769                .map(|s| {
770                    if let TracerPayloadCollection::V07(collection) = &s.tracer_payloads {
771                        collection.iter().map(|s| s.chunks.len()).max().unwrap()
772                    } else {
773                        0
774                    }
775                })
776                .max()
777                .unwrap()
778                > 1
779        );
780        assert!(coalesced.len() > 1 && coalesced.len() < 5);
781    }
782
783    #[tokio::test]
784    #[allow(clippy::type_complexity)]
785    #[cfg_attr(all(miri, target_os = "macos"), ignore)]
786    async fn test_get_v05_traces_from_request_body() {
787        let data: (
788            Vec<String>,
789            Vec<
790                Vec<(
791                    u8,
792                    u8,
793                    u8,
794                    u64,
795                    u64,
796                    u64,
797                    i64,
798                    i64,
799                    i32,
800                    HashMap<u8, u8>,
801                    HashMap<u8, f64>,
802                    u8,
803                )>,
804            >,
805        ) = (
806            vec![
807                "baggage".to_string(),
808                "item".to_string(),
809                "elasticsearch.version".to_string(),
810                "7.0".to_string(),
811                "my-name".to_string(),
812                "X".to_string(),
813                "my-service".to_string(),
814                "my-resource".to_string(),
815                "_dd.sampling_rate_whatever".to_string(),
816                "value whatever".to_string(),
817                "sql".to_string(),
818            ],
819            vec![vec![(
820                6,
821                4,
822                7,
823                1,
824                2,
825                3,
826                123,
827                456,
828                1,
829                HashMap::from([(8, 9), (0, 1), (2, 3)]),
830                HashMap::from([(5, 1.2)]),
831                10,
832            )]],
833        );
834        let bytes = rmp_serde::to_vec(&data).unwrap();
835        let res = get_v05_traces_from_request_body(hyper_migration::Body::from(bytes)).await;
836        assert!(res.is_ok());
837        let (_, traces) = res.unwrap();
838        let span = traces[0][0].clone();
839        let test_span = pb::Span {
840            service: "my-service".to_string(),
841            name: "my-name".to_string(),
842            resource: "my-resource".to_string(),
843            trace_id: 1,
844            span_id: 2,
845            parent_id: 3,
846            start: 123,
847            duration: 456,
848            error: 1,
849            meta: HashMap::from([
850                ("baggage".to_string(), "item".to_string()),
851                ("elasticsearch.version".to_string(), "7.0".to_string()),
852                (
853                    "_dd.sampling_rate_whatever".to_string(),
854                    "value whatever".to_string(),
855                ),
856            ]),
857            metrics: HashMap::from([("X".to_string(), 1.2)]),
858            meta_struct: HashMap::default(),
859            r#type: "sql".to_string(),
860            span_links: vec![],
861            span_events: vec![],
862        };
863        assert_eq!(span, test_span);
864    }
865
866    #[tokio::test]
867    #[cfg_attr(miri, ignore)]
868    async fn test_get_traces_from_request_body() {
869        let pairs = vec![
870            (
871                json!([{
872                    "service": "test-service",
873                    "name": "test-service-name",
874                    "resource": "test-service-resource",
875                    "trace_id": 111,
876                    "span_id": 222,
877                    "parent_id": 333,
878                    "start": 1,
879                    "duration": 5,
880                    "error": 0,
881                    "meta": {},
882                    "metrics": {},
883                }]),
884                vec![vec![pb::Span {
885                    service: "test-service".to_string(),
886                    name: "test-service-name".to_string(),
887                    resource: "test-service-resource".to_string(),
888                    trace_id: 111,
889                    span_id: 222,
890                    parent_id: 333,
891                    start: 1,
892                    duration: 5,
893                    error: 0,
894                    meta: HashMap::new(),
895                    metrics: HashMap::new(),
896                    meta_struct: HashMap::new(),
897                    r#type: "".to_string(),
898                    span_links: vec![],
899                    span_events: vec![],
900                }]],
901            ),
902            (
903                json!([{
904                    "name": "test-service-name",
905                    "resource": "test-service-resource",
906                    "trace_id": 111,
907                    "span_id": 222,
908                    "start": 1,
909                    "duration": 5,
910                    "meta": {},
911                }]),
912                vec![vec![pb::Span {
913                    service: "".to_string(),
914                    name: "test-service-name".to_string(),
915                    resource: "test-service-resource".to_string(),
916                    trace_id: 111,
917                    span_id: 222,
918                    parent_id: 0,
919                    start: 1,
920                    duration: 5,
921                    error: 0,
922                    meta: HashMap::new(),
923                    metrics: HashMap::new(),
924                    meta_struct: HashMap::new(),
925                    r#type: "".to_string(),
926                    span_links: vec![],
927                    span_events: vec![],
928                }]],
929            ),
930        ];
931
932        for (trace_input, output) in pairs {
933            let bytes = rmp_serde::to_vec(&vec![&trace_input]).unwrap();
934            let request = Request::builder()
935                .body(hyper_migration::Body::from(bytes))
936                .unwrap();
937            let res = get_traces_from_request_body(request.into_body()).await;
938            assert!(res.is_ok());
939            assert_eq!(res.unwrap().1, output);
940        }
941    }
942
943    #[tokio::test]
944    #[cfg_attr(miri, ignore)]
945    async fn test_get_traces_from_request_body_with_span_links() {
946        let trace_input = json!([[{
947            "service": "test-service",
948            "name": "test-name",
949            "resource": "test-resource",
950            "trace_id": 111,
951            "span_id": 222,
952            "parent_id": 333,
953            "start": 1,
954            "duration": 5,
955            "error": 0,
956            "meta": {},
957            "metrics": {},
958            "span_links": [{
959                "trace_id": 999,
960                "span_id": 888,
961                "trace_id_high": 777,
962                "attributes": {"key": "value"},
963                "tracestate": "vendor=value"
964                // flags field intentionally omitted
965            }]
966        }]]);
967
968        let expected_output = vec![vec![pb::Span {
969            service: "test-service".to_string(),
970            name: "test-name".to_string(),
971            resource: "test-resource".to_string(),
972            trace_id: 111,
973            span_id: 222,
974            parent_id: 333,
975            start: 1,
976            duration: 5,
977            error: 0,
978            meta: HashMap::new(),
979            metrics: HashMap::new(),
980            meta_struct: HashMap::new(),
981            r#type: String::new(),
982            span_links: vec![pb::SpanLink {
983                trace_id: 999,
984                span_id: 888,
985                trace_id_high: 777,
986                attributes: HashMap::from([("key".to_string(), "value".to_string())]),
987                tracestate: "vendor=value".to_string(),
988                flags: 0, // Should default to 0 when omitted
989            }],
990            span_events: vec![],
991        }]];
992
993        let bytes = rmp_serde::to_vec(&trace_input).unwrap();
994        let request = Request::builder()
995            .body(hyper_migration::Body::from(bytes))
996            .unwrap();
997
998        let res = get_traces_from_request_body(request.into_body()).await;
999        assert!(res.is_ok(), "Failed to deserialize: {res:?}");
1000        assert_eq!(res.unwrap().1, expected_output);
1001    }
1002
1003    #[test]
1004    fn test_get_root_span_index_from_complete_trace() {
1005        let trace = vec![
1006            create_test_span(1234, 12341, 0, 1, false),
1007            create_test_span(1234, 12342, 12341, 1, false),
1008            create_test_span(1234, 12343, 12342, 1, false),
1009        ];
1010
1011        let root_span_index = get_root_span_index(&trace);
1012        assert!(root_span_index.is_ok());
1013        assert_eq!(root_span_index.unwrap(), 0);
1014    }
1015
1016    #[test]
1017    fn test_get_root_span_index_from_partial_trace() {
1018        let trace = vec![
1019            create_test_span(1234, 12342, 12341, 1, false),
1020            create_test_span(1234, 12341, 12340, 1, false), /* this is the root span, it's
1021                                                             * parent is not in the trace */
1022            create_test_span(1234, 12343, 12342, 1, false),
1023        ];
1024
1025        let root_span_index = get_root_span_index(&trace);
1026        assert!(root_span_index.is_ok());
1027        assert_eq!(root_span_index.unwrap(), 1);
1028    }
1029
1030    #[test]
1031    fn test_set_serverless_root_span_tags_azure_function() {
1032        let mut span = create_test_span(1234, 12342, 12341, 1, false);
1033        set_serverless_root_span_tags(
1034            &mut span,
1035            Some("test_function".to_string()),
1036            &EnvironmentType::AzureFunction,
1037        );
1038        assert_eq!(
1039            span.meta,
1040            HashMap::from([
1041                (
1042                    "runtime-id".to_string(),
1043                    "test-runtime-id-value".to_string()
1044                ),
1045                ("_dd.origin".to_string(), "azurefunction".to_string()),
1046                ("origin".to_string(), "azurefunction".to_string()),
1047                ("functionname".to_string(), "test_function".to_string()),
1048                ("env".to_string(), "test-env".to_string()),
1049                ("service".to_string(), "test-service".to_string())
1050            ]),
1051        );
1052    }
1053
1054    #[test]
1055    fn test_set_serverless_root_span_tags_cloud_function() {
1056        let mut span = create_test_span(1234, 12342, 12341, 1, false);
1057        set_serverless_root_span_tags(
1058            &mut span,
1059            Some("test_function".to_string()),
1060            &EnvironmentType::CloudFunction,
1061        );
1062        assert_eq!(
1063            span.meta,
1064            HashMap::from([
1065                (
1066                    "runtime-id".to_string(),
1067                    "test-runtime-id-value".to_string()
1068                ),
1069                ("_dd.origin".to_string(), "cloudfunction".to_string()),
1070                ("origin".to_string(), "cloudfunction".to_string()),
1071                ("functionname".to_string(), "test_function".to_string()),
1072                ("env".to_string(), "test-env".to_string()),
1073                ("service".to_string(), "test-service".to_string())
1074            ]),
1075        );
1076    }
1077
1078    #[test]
1079    fn test_has_top_level() {
1080        let top_level_span = create_test_span(123, 1234, 12, 1, true);
1081        let not_top_level_span = create_test_span(123, 1234, 12, 1, false);
1082        assert!(has_top_level(&top_level_span));
1083        assert!(!has_top_level(&not_top_level_span));
1084    }
1085
1086    #[test]
1087    fn test_is_measured() {
1088        let mut measured_span = create_test_span(123, 1234, 12, 1, true);
1089        measured_span.metrics.insert(MEASURED_KEY.into(), 1.0);
1090        let not_measured_span = create_test_span(123, 1234, 12, 1, true);
1091        assert!(is_measured(&measured_span));
1092        assert!(!is_measured(&not_measured_span));
1093    }
1094
1095    #[test]
1096    fn test_compute_top_level() {
1097        let mut span_with_different_service = create_test_span(123, 5, 2, 1, false);
1098        span_with_different_service.service = "another_service".into();
1099        let mut trace = vec![
1100            // Root span, should be marked as top-level
1101            create_test_span(123, 1, 0, 1, false),
1102            // Should not be marked as top-level
1103            create_test_span(123, 2, 1, 1, false),
1104            // No parent in local trace, should be marked as
1105            // top-level
1106            create_test_span(123, 4, 3, 1, false),
1107            // Parent belongs to another service, should be marked
1108            // as top-level
1109            span_with_different_service,
1110        ];
1111
1112        compute_top_level_span(trace.as_mut_slice());
1113
1114        let spans_marked_as_top_level: Vec<u64> = trace
1115            .iter()
1116            .filter_map(|span| {
1117                if has_top_level(span) {
1118                    Some(span.span_id)
1119                } else {
1120                    None
1121                }
1122            })
1123            .collect();
1124        assert_eq!(spans_marked_as_top_level, [1, 4, 5])
1125    }
1126
1127    #[test]
1128    fn test_collect_trace_chunks_v05() {
1129        let chunk = vec![create_test_no_alloc_span(123, 456, 789, 1, true)];
1130
1131        let collection = collect_trace_chunks(vec![chunk], true).unwrap();
1132
1133        let (dict, traces) = match collection {
1134            TraceChunks::V05(payload) => payload,
1135            _ => panic!("Unexpected type"),
1136        };
1137
1138        assert_eq!(dict.len(), 16);
1139
1140        let span = &traces[0][0];
1141        assert_eq!(span.service, 1);
1142        assert_eq!(span.name, 2);
1143        assert_eq!(span.resource, 3);
1144        assert_eq!(span.trace_id, 123);
1145        assert_eq!(span.span_id, 456);
1146        assert_eq!(span.parent_id, 789);
1147        assert_eq!(span.start, 1);
1148        assert_eq!(span.error, 0);
1149        assert_eq!(span.error, 0);
1150        assert_eq!(span.r#type, 15);
1151        assert_eq!(
1152            *span
1153                .meta
1154                .get(&find_index_in_dict(&dict, "service").unwrap())
1155                .unwrap(),
1156            find_index_in_dict(&dict, "test-service").unwrap()
1157        );
1158        assert_eq!(
1159            *span
1160                .meta
1161                .get(&find_index_in_dict(&dict, "env").unwrap())
1162                .unwrap(),
1163            find_index_in_dict(&dict, "test-env").unwrap()
1164        );
1165        assert_eq!(
1166            *span
1167                .meta
1168                .get(&find_index_in_dict(&dict, "runtime-id").unwrap())
1169                .unwrap(),
1170            find_index_in_dict(&dict, "test-runtime-id-value").unwrap()
1171        );
1172        assert_eq!(
1173            *span
1174                .meta
1175                .get(&find_index_in_dict(&dict, "_dd.origin").unwrap())
1176                .unwrap(),
1177            find_index_in_dict(&dict, "cloudfunction").unwrap()
1178        );
1179        assert_eq!(
1180            *span
1181                .meta
1182                .get(&find_index_in_dict(&dict, "origin").unwrap())
1183                .unwrap(),
1184            find_index_in_dict(&dict, "cloudfunction").unwrap()
1185        );
1186        assert_eq!(
1187            *span
1188                .meta
1189                .get(&find_index_in_dict(&dict, "functionname").unwrap())
1190                .unwrap(),
1191            find_index_in_dict(&dict, "dummy_function_name").unwrap()
1192        );
1193        assert_eq!(
1194            *span
1195                .metrics
1196                .get(&find_index_in_dict(&dict, "_top_level").unwrap())
1197                .unwrap(),
1198            1.0
1199        );
1200    }
1201}