Skip to main content

libdd_trace_utils/
trace_utils.rs

1// Copyright 2023-Present Datadog, Inc. https://www.datadoghq.com/
2// SPDX-License-Identifier: Apache-2.0
3
4pub use crate::send_data::send_data_result::SendDataResult;
5pub use crate::send_data::SendData;
6use crate::span::v05::dict::SharedDict;
7use crate::span::{v05, TraceData};
8pub use crate::tracer_header_tags::TracerHeaderTags;
9use crate::tracer_payload::TracerPayloadCollection;
10use crate::tracer_payload::{self, TraceChunks};
11use anyhow::anyhow;
12use bytes::buf::Reader;
13use bytes::Buf;
14use http_body_util::BodyExt;
15use libdd_common::azure_app_services;
16use libdd_trace_normalization::normalizer;
17use libdd_trace_protobuf::pb;
18use rmp::decode::read_array_len;
19use rmpv::decode::read_value;
20use rmpv::{Integer, Value};
21use std::cmp::Ordering;
22use std::collections::{HashMap, HashSet};
23use std::env;
24use tracing::{debug, error};
25
26/// The maximum payload size for a single request that can be sent to the trace agent. Payloads
27/// larger than this size will be dropped and the agent will return a 413 error if
28/// `datadog-send-real-http-status` is set.
29pub const MAX_PAYLOAD_SIZE: usize = 25 * 1024 * 1024;
30/// Span metric the mini agent must set for the backend to recognize top level span
31const TOP_LEVEL_KEY: &str = "_top_level";
32/// Span metric the tracer sets to denote a top level span
33const TRACER_TOP_LEVEL_KEY: &str = "_dd.top_level";
34const MEASURED_KEY: &str = "_dd.measured";
35const PARTIAL_VERSION_KEY: &str = "_dd.partial_version";
36const MAX_STRING_DICT_SIZE: u32 = 25_000_000;
37const SPAN_ELEMENT_COUNT: usize = 12;
38
39/// First value of returned tuple is the payload size
40pub async fn get_traces_from_request_body<B>(body: B) -> anyhow::Result<(usize, Vec<Vec<pb::Span>>)>
41where
42    B: http_body::Body,
43    B::Error: std::error::Error + Send + Sync + 'static,
44{
45    let buffer = body.collect().await?.aggregate();
46    let size = buffer.remaining();
47
48    let traces: Vec<Vec<pb::Span>> = match rmp_serde::from_read(buffer.reader()) {
49        Ok(res) => res,
50        Err(err) => {
51            anyhow::bail!("Error deserializing trace from request body: {err}")
52        }
53    };
54
55    Ok((size, traces))
56}
57
58#[inline]
59fn get_v05_strings_dict(reader: &mut Reader<impl Buf>) -> anyhow::Result<Vec<String>> {
60    let dict_size =
61        read_array_len(reader).map_err(|err| anyhow!("Error reading dict size: {err}"))?;
62    if dict_size > MAX_STRING_DICT_SIZE {
63        anyhow::bail!(
64            "Error deserializing strings dictionary. Dict size is too large: {dict_size}"
65        );
66    }
67    let mut dict: Vec<String> = Vec::with_capacity(dict_size.try_into()?);
68    for _ in 0..dict_size {
69        match read_value(reader)? {
70            Value::String(s) => {
71                let parsed_string = s.into_str().ok_or_else(|| anyhow!("Error reading string dict"))?;
72                dict.push(parsed_string);
73            }
74            val => anyhow::bail!("Error deserializing strings dictionary. Value in string dict is not a string: {val}")
75        }
76    }
77    Ok(dict)
78}
79
80#[inline]
81fn get_v05_span(reader: &mut Reader<impl Buf>, dict: &[String]) -> anyhow::Result<pb::Span> {
82    let mut span: pb::Span = Default::default();
83    let span_size = rmp::decode::read_array_len(reader)
84        .map_err(|err| anyhow!("Error reading span size: {err}"))? as usize;
85    if span_size != SPAN_ELEMENT_COUNT {
86        anyhow::bail!("Expected an array of exactly 12 elements in a span, got {span_size}");
87    }
88    // 0 - service
89    span.service = get_v05_string(reader, dict, "service")?;
90    // 1 - name
91    span.name = get_v05_string(reader, dict, "name")?;
92    // 2 - resource
93    span.resource = get_v05_string(reader, dict, "resource")?;
94
95    // 3 - trace_id
96    match read_value(reader)? {
97        Value::Integer(i) => {
98            span.trace_id = i.as_u64().ok_or_else(|| {
99                anyhow!("Error reading span trace_id, value is not an integer: {i}")
100            })?;
101        }
102        val => anyhow::bail!("Error reading span trace_id, value is not an integer: {val}"),
103    };
104    // 4 - span_id
105    match read_value(reader)? {
106        Value::Integer(i) => {
107            span.span_id = i.as_u64().ok_or_else(|| {
108                anyhow!("Error reading span span_id, value is not an integer: {i}")
109            })?;
110        }
111        val => anyhow::bail!("Error reading span span_id, value is not an integer: {val}"),
112    };
113    // 5 - parent_id
114    match read_value(reader)? {
115        Value::Integer(i) => {
116            span.parent_id = i.as_u64().ok_or_else(|| {
117                anyhow!("Error reading span parent_id, value is not an integer: {i}")
118            })?;
119        }
120        val => anyhow::bail!("Error reading span parent_id, value is not an integer: {val}"),
121    };
122    // 6 - start
123    match read_value(reader)? {
124        Value::Integer(i) => {
125            span.start = i
126                .as_i64()
127                .ok_or_else(|| anyhow!("Error reading span start, value is not an integer: {i}"))?;
128        }
129        val => anyhow::bail!("Error reading span start, value is not an integer: {val}"),
130    };
131    // 7 - duration
132    match read_value(reader)? {
133        Value::Integer(i) => {
134            span.duration = i.as_i64().ok_or_else(|| {
135                anyhow!("Error reading span duration, value is not an integer: {i}")
136            })?;
137        }
138        val => anyhow::bail!("Error reading span duration, value is not an integer: {val}"),
139    };
140    // 8 - error
141    match read_value(reader)? {
142        Value::Integer(i) => {
143            span.error = i
144                .as_i64()
145                .ok_or_else(|| anyhow!("Error reading span error, value is not an integer: {i}"))?
146                as i32;
147        }
148        val => anyhow::bail!("Error reading span error, value is not an integer: {val}"),
149    }
150    // 9 - meta
151    match read_value(reader)? {
152        Value::Map(meta) => {
153            for (k, v) in meta.iter() {
154                match k {
155                    Value::Integer(k) => {
156                        match v {
157                            Value::Integer(v) => {
158                                let key = str_from_dict(dict, *k)?;
159                                let val = str_from_dict(dict, *v)?;
160                                span.meta.insert(key, val);
161                            }
162                            _ => anyhow::bail!("Error reading span meta, value is not an integer and can't be looked up in dict: {v}")
163                        }
164                    }
165                    _ => anyhow::bail!("Error reading span meta, key is not an integer and can't be looked up in dict: {k}")
166                }
167            }
168        }
169        val => anyhow::bail!("Error reading span meta, value is not a map: {val}"),
170    }
171    // 10 - metrics
172    match read_value(reader)? {
173        Value::Map(metrics) => {
174            for (k, v) in metrics.iter() {
175                match k {
176                    Value::Integer(k) => {
177                        match v {
178                            Value::Integer(v) => {
179                                let key = str_from_dict(dict, *k)?;
180                                span.metrics.insert(key, v.as_f64().ok_or_else(||anyhow!("Error reading span metrics, value is not an integer: {v}"))?);
181                            }
182                            Value::F64(v) => {
183                                let key = str_from_dict(dict, *k)?;
184                                span.metrics.insert(key, *v);
185                            }
186                            _ => anyhow::bail!(
187                                "Error reading span metrics, value is not a float or integer: {v}"
188                            ),
189                        }
190                    }
191                    _ => anyhow::bail!("Error reading span metrics, key is not an integer: {k}"),
192                }
193            }
194        }
195        val => anyhow::bail!("Error reading span metrics, value is not a map: {val}"),
196    }
197
198    // 11 - type
199    match read_value(reader)? {
200        Value::Integer(s) => span.r#type = str_from_dict(dict, s)?,
201        val => anyhow::bail!("Error reading span type, value is not an integer: {val}"),
202    }
203    Ok(span)
204}
205
206#[inline]
207fn str_from_dict(dict: &[String], id: Integer) -> anyhow::Result<String> {
208    let id = id
209        .as_i64()
210        .ok_or_else(|| anyhow!("Error reading string from dict, id is not an integer: {id}"))?
211        as usize;
212    if id >= dict.len() {
213        anyhow::bail!("Error reading string from dict, id out of bounds: {id}");
214    }
215    Ok(dict[id].to_string())
216}
217
218#[inline]
219fn get_v05_string(
220    reader: &mut Reader<impl Buf>,
221    dict: &[String],
222    field_name: &str,
223) -> anyhow::Result<String> {
224    match read_value(reader)? {
225        Value::Integer(s) => {
226            str_from_dict(dict, s)
227        },
228        val => anyhow::bail!("Error reading {field_name}, value is not an integer and can't be looked up in dict: {val}")
229    }
230}
231
232pub async fn get_v05_traces_from_request_body<B>(
233    body: B,
234) -> anyhow::Result<(usize, Vec<Vec<pb::Span>>)>
235where
236    B: http_body::Body,
237    B::Error: std::error::Error + Send + Sync + 'static,
238{
239    let buffer = body.collect().await?.aggregate();
240    let body_size = buffer.remaining();
241    let mut reader = buffer.reader();
242    let wrapper_size = read_array_len(&mut reader)?;
243    if wrapper_size != 2 {
244        anyhow::bail!("Expected an arrary of exactly 2 elements, got {wrapper_size}");
245    }
246
247    let dict = get_v05_strings_dict(&mut reader)?;
248
249    let traces_size = rmp::decode::read_array_len(&mut reader)?;
250    let mut traces: Vec<Vec<pb::Span>> = Default::default();
251
252    for _ in 0..traces_size {
253        let spans_size = rmp::decode::read_array_len(&mut reader)?;
254        let mut trace: Vec<pb::Span> = Default::default();
255
256        for _ in 0..spans_size {
257            let span = get_v05_span(&mut reader, &dict)?;
258            trace.push(span);
259        }
260        traces.push(trace);
261    }
262    Ok((body_size, traces))
263}
264
265/// Tags extracted from a tracer payload's traces, used to populate top level tracer payload fields.
266#[derive(Default)]
267pub struct TracerPayloadTags {
268    pub env: String,
269    pub app_version: String,
270    pub hostname: String,
271    pub runtime_id: String,
272}
273
274/// Returns the first non-empty value of `field` found in `trace`, searching the root span first
275/// then all other spans.
276fn search_trace_for_field(root: &pb::Span, trace: &[pb::Span], field: &str) -> Option<String> {
277    if let Some(v) = root.meta.get(field) {
278        if !v.is_empty() {
279            return Some(v.clone());
280        }
281    }
282    for span in trace {
283        if span.span_id == root.span_id {
284            continue;
285        }
286        if let Some(v) = span.meta.get(field) {
287            if !v.is_empty() {
288                return Some(v.clone());
289            }
290        }
291    }
292    None
293}
294
295pub(crate) fn construct_trace_chunk(trace: Vec<pb::Span>) -> pb::TraceChunk {
296    pb::TraceChunk {
297        priority: normalizer::SamplerPriority::None as i32,
298        origin: "".to_string(),
299        spans: trace,
300        tags: HashMap::new(),
301        dropped_trace: false,
302    }
303}
304
305pub(crate) fn construct_tracer_payload(
306    chunks: Vec<pb::TraceChunk>,
307    tracer_tags: &TracerHeaderTags,
308    tracer_payload_tags: TracerPayloadTags,
309) -> pb::TracerPayload {
310    pb::TracerPayload {
311        app_version: tracer_payload_tags.app_version,
312        language_name: tracer_tags.lang.to_string(),
313        container_id: tracer_tags.container_id.to_string(),
314        env: tracer_payload_tags.env,
315        runtime_id: tracer_payload_tags.runtime_id,
316        chunks,
317        hostname: tracer_payload_tags.hostname,
318        language_version: tracer_tags.lang_version.to_string(),
319        tags: HashMap::new(),
320        tracer_version: tracer_tags.tracer_version.to_string(),
321    }
322}
323
324pub(crate) fn cmp_send_data_payloads(a: &pb::TracerPayload, b: &pb::TracerPayload) -> Ordering {
325    a.tracer_version
326        .cmp(&b.tracer_version)
327        .then(a.language_version.cmp(&b.language_version))
328        .then(a.language_name.cmp(&b.language_name))
329        .then(a.hostname.cmp(&b.hostname))
330        .then(a.container_id.cmp(&b.container_id))
331        .then(a.runtime_id.cmp(&b.runtime_id))
332        .then(a.env.cmp(&b.env))
333        .then(a.app_version.cmp(&b.app_version))
334}
335
336pub fn coalesce_send_data(mut data: Vec<SendData>) -> Vec<SendData> {
337    // TODO trace payloads with identical data except for chunk could be merged?
338
339    data.sort_unstable_by(|a, b| {
340        a.get_target()
341            .url
342            .to_string()
343            .cmp(&b.get_target().url.to_string())
344            .then(a.get_target().test_token.cmp(&b.get_target().test_token))
345    });
346    data.dedup_by(|a, b| {
347        if a.get_target().url == b.get_target().url
348            && a.get_target().test_token == b.get_target().test_token
349        {
350            // Size is only an approximation. In practice it won't vary much, but be safe here.
351            // We also don't care about the exact maximum size, like two 25 MB or one 50 MB request
352            // has similar results. The primary goal here is avoiding many small requests.
353            // TODO: maybe make the MAX_PAYLOAD_SIZE configurable?
354            if a.size + b.size < MAX_PAYLOAD_SIZE / 2 {
355                // Note: dedup_by drops a, and retains b.
356                b.tracer_payloads.append(&mut a.tracer_payloads);
357                b.size += a.size;
358                return true;
359            }
360        }
361        false
362    });
363    // Merge chunks with common properties. Reduces requests for agentful mode.
364    // And reduces a little bit of data for agentless.
365    for send_data in data.iter_mut() {
366        send_data.tracer_payloads.merge();
367    }
368    data
369}
370
371pub fn get_root_span_index(trace: &[pb::Span]) -> anyhow::Result<usize> {
372    if trace.is_empty() {
373        anyhow::bail!("Cannot find root span index in an empty trace.");
374    }
375
376    // Do a first pass to find if we have an obvious root span (starting from the end) since some
377    // clients put the root span last.
378    for (i, span) in trace.iter().enumerate().rev() {
379        if span.parent_id == 0 {
380            return Ok(i);
381        }
382    }
383
384    let mut span_ids: HashSet<u64> = HashSet::with_capacity(trace.len());
385    for span in trace.iter() {
386        span_ids.insert(span.span_id);
387    }
388
389    let mut root_span_id = None;
390    for (i, span) in trace.iter().enumerate() {
391        // If a span's parent is not in the trace, it is a root
392        if !span_ids.contains(&span.parent_id) {
393            if root_span_id.is_some() {
394                debug!(
395                    trace_id = &trace[0].trace_id,
396                    "trace has multiple root spans"
397                );
398            }
399            root_span_id = Some(i);
400        }
401    }
402    Ok(match root_span_id {
403        Some(i) => i,
404        None => {
405            debug!(
406                trace_id = &trace[0].trace_id,
407                "Could not find the root span for trace"
408            );
409            trace.len() - 1
410        }
411    })
412}
413
414/// Updates all the spans top-level attribute.
415/// A span is considered top-level if:
416///   - it's a root span
417///   - OR its parent is unknown (other part of the code, distributed trace)
418///   - OR its parent belongs to another service (in that case it's a "local root" being the highest
419///     ancestor of other spans belonging to this service and attached to it).
420pub fn compute_top_level_span(trace: &mut [pb::Span]) {
421    let mut span_id_to_service: HashMap<u64, String> = HashMap::new();
422    for span in trace.iter() {
423        span_id_to_service.insert(span.span_id, span.service.clone());
424    }
425    for span in trace.iter_mut() {
426        if span.parent_id == 0 {
427            set_top_level_span(span, true);
428            continue;
429        }
430        match span_id_to_service.get(&span.parent_id) {
431            Some(parent_span_service) => {
432                if !parent_span_service.eq(&span.service) {
433                    // parent is not in the same service
434                    set_top_level_span(span, true)
435                }
436            }
437            None => {
438                // span has no parent in chunk
439                set_top_level_span(span, true)
440            }
441        }
442    }
443}
444
445/// Return true if the span has a top level key set
446pub fn has_top_level(span: &pb::Span) -> bool {
447    span.metrics
448        .get(TRACER_TOP_LEVEL_KEY)
449        .is_some_and(|v| *v == 1.0)
450        || span.metrics.get(TOP_LEVEL_KEY).is_some_and(|v| *v == 1.0)
451}
452
453fn set_top_level_span(span: &mut pb::Span, is_top_level: bool) {
454    if is_top_level {
455        span.metrics.insert(TOP_LEVEL_KEY.to_string(), 1.0);
456    } else {
457        span.metrics.remove(TOP_LEVEL_KEY);
458    }
459}
460
461pub fn set_serverless_root_span_tags(
462    span: &mut pb::Span,
463    app_name: Option<String>,
464    env_type: &EnvironmentType,
465) {
466    let origin_tag = match env_type {
467        EnvironmentType::CloudFunction => "cloudfunction",
468        EnvironmentType::AzureFunction => "azurefunction",
469        EnvironmentType::AzureSpringApp => "azurespringapp",
470        EnvironmentType::LambdaFunction => "lambda", // historical reasons
471    };
472    span.meta
473        .insert("_dd.origin".to_string(), origin_tag.to_string());
474    span.meta
475        .insert("origin".to_string(), origin_tag.to_string());
476
477    if let Some(function_name) = app_name {
478        match env_type {
479            EnvironmentType::CloudFunction
480            | EnvironmentType::AzureFunction
481            | EnvironmentType::LambdaFunction => {
482                span.meta.insert("functionname".to_string(), function_name);
483            }
484            _ => {}
485        }
486    }
487}
488
489fn update_tracer_top_level(span: &mut pb::Span) {
490    if span.metrics.contains_key(TRACER_TOP_LEVEL_KEY) {
491        span.metrics.insert(TOP_LEVEL_KEY.to_string(), 1.0);
492    }
493}
494
495#[derive(Clone, Debug, Eq, PartialEq)]
496pub enum EnvironmentType {
497    CloudFunction,
498    AzureFunction,
499    AzureSpringApp,
500    LambdaFunction,
501}
502
503#[derive(Clone, Debug, Eq, PartialEq)]
504pub struct MiniAgentMetadata {
505    pub azure_spring_app_hostname: Option<String>,
506    pub azure_spring_app_name: Option<String>,
507    pub gcp_project_id: Option<String>,
508    pub gcp_region: Option<String>,
509    pub version: Option<String>,
510}
511
512impl Default for MiniAgentMetadata {
513    fn default() -> Self {
514        MiniAgentMetadata {
515            azure_spring_app_hostname: Default::default(),
516            azure_spring_app_name: Default::default(),
517            gcp_project_id: Default::default(),
518            gcp_region: Default::default(),
519            version: env::var("DD_SERVERLESS_COMPAT_VERSION").ok(),
520        }
521    }
522}
523
524pub fn enrich_span_with_mini_agent_metadata(
525    span: &mut pb::Span,
526    mini_agent_metadata: &MiniAgentMetadata,
527) {
528    if let Some(azure_spring_app_hostname) = &mini_agent_metadata.azure_spring_app_hostname {
529        span.meta.insert(
530            "asa.hostname".to_string(),
531            azure_spring_app_hostname.to_string(),
532        );
533    }
534    if let Some(azure_spring_app_name) = &mini_agent_metadata.azure_spring_app_name {
535        span.meta
536            .insert("asa.name".to_string(), azure_spring_app_name.to_string());
537    }
538    if let Some(serverless_compat_version) = &mini_agent_metadata.version {
539        span.meta.insert(
540            "_dd.serverless_compat_version".to_string(),
541            serverless_compat_version.to_string(),
542        );
543    }
544}
545
546pub fn enrich_span_with_google_cloud_function_metadata(
547    span: &mut pb::Span,
548    mini_agent_metadata: &MiniAgentMetadata,
549    function: Option<String>,
550) {
551    #[allow(clippy::todo)]
552    let Some(region) = &mini_agent_metadata.gcp_region
553    else {
554        todo!()
555    };
556    #[allow(clippy::todo)]
557    let Some(project) = &mini_agent_metadata.gcp_project_id
558    else {
559        todo!()
560    };
561
562    if let Some(function) = function {
563        if !region.is_empty() && !project.is_empty() {
564            let resource_name = format!(
565                "projects/{}/locations/{}/functions/{}",
566                project, region, function
567            );
568
569            span.meta
570                .insert("gcrfx.location".to_string(), region.to_string());
571            span.meta
572                .insert("gcrfx.project_id".to_string(), project.to_string());
573            span.meta
574                .insert("gcrfx.resource_name".to_string(), resource_name.to_string());
575        }
576    }
577}
578
579pub fn enrich_span_with_azure_function_metadata(span: &mut pb::Span) {
580    if span.name == "azure.apim" {
581        return;
582    }
583
584    if let Some(aas_metadata) = &*azure_app_services::AAS_METADATA_FUNCTION {
585        span.meta.extend(
586            aas_metadata
587                .get_function_tags()
588                .map(|(name, value)| (name.to_string(), value.to_string())),
589        );
590    }
591}
592
593pub fn collect_trace_chunks<T: TraceData>(
594    traces: Vec<Vec<crate::span::v04::Span<T>>>,
595    use_v05_format: bool,
596) -> anyhow::Result<TraceChunks<T>> {
597    if use_v05_format {
598        let mut shared_dict = SharedDict::default();
599        let mut v05_traces: Vec<Vec<v05::Span>> = Vec::with_capacity(traces.len());
600        for trace in traces {
601            let trace_len = trace.len();
602            let v05_trace = trace.into_iter().try_fold(
603                Vec::with_capacity(trace_len),
604                |mut acc, span| -> anyhow::Result<Vec<v05::Span>> {
605                    acc.push(v05::from_v04_span(span, &mut shared_dict)?);
606                    Ok(acc)
607                },
608            )?;
609
610            v05_traces.push(v05_trace);
611        }
612        Ok(TraceChunks::V05((shared_dict, v05_traces)))
613    } else {
614        Ok(TraceChunks::V04(traces))
615    }
616}
617
618pub fn collect_pb_trace_chunks<T: tracer_payload::TraceChunkProcessor>(
619    mut traces: Vec<Vec<pb::Span>>,
620    tracer_header_tags: &TracerHeaderTags,
621    process_chunk: &mut T,
622    is_agentless: bool,
623) -> anyhow::Result<TracerPayloadCollection> {
624    let mut trace_chunks: Vec<pb::TraceChunk> = Vec::new();
625
626    // We'll skip setting the global metadata and rely on the agent to unpack these
627    let mut tracer_payload_tags = TracerPayloadTags::default();
628
629    for trace in traces.iter_mut() {
630        if is_agentless {
631            if let Err(e) = normalizer::normalize_trace(trace) {
632                error!("Error normalizing trace: {e}");
633            }
634        }
635
636        let mut chunk = construct_trace_chunk(trace.to_vec());
637
638        let root_span_index = match get_root_span_index(trace) {
639            Ok(res) => res,
640            Err(e) => {
641                error!("Error getting the root span index of a trace, skipping. {e}");
642                continue;
643            }
644        };
645
646        if let Err(e) = normalizer::normalize_chunk(&mut chunk, root_span_index) {
647            error!("Error normalizing trace chunk: {e}");
648        }
649
650        for span in chunk.spans.iter_mut() {
651            // TODO: obfuscate & truncate spans
652            if tracer_header_tags.client_computed_top_level {
653                update_tracer_top_level(span);
654            }
655        }
656
657        if !tracer_header_tags.client_computed_top_level {
658            compute_top_level_span(&mut chunk.spans);
659        }
660
661        process_chunk.process(&mut chunk, root_span_index);
662
663        trace_chunks.push(chunk);
664
665        if is_agentless {
666            // Check each field independently so that a later trace can fill in fields missing
667            // from an earlier trace.
668            let root = &trace[root_span_index];
669            if tracer_payload_tags.env.is_empty() {
670                if let Some(mut v) = search_trace_for_field(root, trace, "env") {
671                    // Normalize env tag in case the span it was pulled from was skipped during
672                    // normalization
673                    libdd_trace_normalization::normalize_utils::normalize_tag(&mut v);
674                    if !v.is_empty() {
675                        tracer_payload_tags.env = v;
676                    }
677                }
678            }
679            if tracer_payload_tags.app_version.is_empty() {
680                if let Some(v) = search_trace_for_field(root, trace, "version") {
681                    tracer_payload_tags.app_version = v;
682                }
683            }
684            if tracer_payload_tags.hostname.is_empty() {
685                if let Some(v) = search_trace_for_field(root, trace, "_dd.hostname") {
686                    tracer_payload_tags.hostname = v;
687                }
688            }
689            if tracer_payload_tags.runtime_id.is_empty() {
690                if let Some(v) = search_trace_for_field(root, trace, "runtime-id") {
691                    tracer_payload_tags.runtime_id = v;
692                }
693            }
694        }
695    }
696
697    Ok(TracerPayloadCollection::V07(vec![
698        construct_tracer_payload(trace_chunks, tracer_header_tags, tracer_payload_tags),
699    ]))
700}
701
702/// Returns true if a span should be measured (i.e., it should get trace metrics calculated).
703pub fn is_measured(span: &pb::Span) -> bool {
704    span.metrics.get(MEASURED_KEY).is_some_and(|v| *v == 1.0)
705}
706
707/// Returns true if the span is a partial snapshot.
708/// This kind of spans are partial images of long-running spans.
709/// When incomplete, a partial snapshot has a metric _dd.partial_version which is a positive
710/// integer. The metric usually increases each time a new version of the same span is sent by the
711/// tracer
712pub fn is_partial_snapshot(span: &pb::Span) -> bool {
713    span.metrics
714        .get(PARTIAL_VERSION_KEY)
715        .is_some_and(|v| *v >= 0.0)
716}
717
718#[cfg(test)]
719mod tests {
720    use super::*;
721    use crate::{
722        span::SharedDictBytes,
723        test_utils::{create_test_no_alloc_span, create_test_span},
724    };
725    use http::Request;
726    use libdd_common::{http_common, Endpoint};
727    use serde_json::json;
728
729    fn find_index_in_dict(dict: &SharedDictBytes, value: &str) -> Option<u32> {
730        let idx = dict.iter().position(|e| e.as_str() == value);
731        idx.map(|idx| idx.try_into().unwrap())
732    }
733
734    #[test]
735    fn test_coalescing_does_not_exceed_max_size() {
736        fn dummy() -> SendData {
737            SendData::new(
738                MAX_PAYLOAD_SIZE / 5 + 1,
739                TracerPayloadCollection::V07(vec![pb::TracerPayload {
740                    container_id: "".to_string(),
741                    language_name: "".to_string(),
742                    language_version: "".to_string(),
743                    tracer_version: "".to_string(),
744                    runtime_id: "".to_string(),
745                    chunks: vec![pb::TraceChunk {
746                        priority: 0,
747                        origin: "".to_string(),
748                        spans: vec![],
749                        tags: Default::default(),
750                        dropped_trace: false,
751                    }],
752                    tags: Default::default(),
753                    env: "".to_string(),
754                    hostname: "".to_string(),
755                    app_version: "".to_string(),
756                }]),
757                TracerHeaderTags::default(),
758                &Endpoint::default(),
759            )
760        }
761        let coalesced = coalesce_send_data(vec![dummy(), dummy(), dummy(), dummy(), dummy()]);
762        assert_eq!(
763            5,
764            coalesced
765                .iter()
766                .map(|s| s.tracer_payloads.size())
767                .sum::<usize>()
768        );
769        // assert some chunks are actually coalesced
770        assert!(
771            coalesced
772                .iter()
773                .map(|s| {
774                    if let TracerPayloadCollection::V07(collection) = &s.tracer_payloads {
775                        collection.iter().map(|s| s.chunks.len()).max().unwrap()
776                    } else {
777                        0
778                    }
779                })
780                .max()
781                .unwrap()
782                > 1
783        );
784        assert!(coalesced.len() > 1 && coalesced.len() < 5);
785    }
786
787    #[tokio::test]
788    #[allow(clippy::type_complexity)]
789    #[cfg_attr(all(miri, target_os = "macos"), ignore)]
790    async fn test_get_v05_traces_from_request_body() {
791        let data: (
792            Vec<String>,
793            Vec<
794                Vec<(
795                    u8,
796                    u8,
797                    u8,
798                    u64,
799                    u64,
800                    u64,
801                    i64,
802                    i64,
803                    i32,
804                    HashMap<u8, u8>,
805                    HashMap<u8, f64>,
806                    u8,
807                )>,
808            >,
809        ) = (
810            vec![
811                "baggage".to_string(),
812                "item".to_string(),
813                "elasticsearch.version".to_string(),
814                "7.0".to_string(),
815                "my-name".to_string(),
816                "X".to_string(),
817                "my-service".to_string(),
818                "my-resource".to_string(),
819                "_dd.sampling_rate_whatever".to_string(),
820                "value whatever".to_string(),
821                "sql".to_string(),
822            ],
823            vec![vec![(
824                6,
825                4,
826                7,
827                1,
828                2,
829                3,
830                123,
831                456,
832                1,
833                HashMap::from([(8, 9), (0, 1), (2, 3)]),
834                HashMap::from([(5, 1.2)]),
835                10,
836            )]],
837        );
838        let bytes = rmp_serde::to_vec(&data).unwrap();
839        let res = get_v05_traces_from_request_body(http_common::Body::from(bytes)).await;
840        assert!(res.is_ok());
841        let (_, traces) = res.unwrap();
842        let span = traces[0][0].clone();
843        let test_span = pb::Span {
844            service: "my-service".to_string(),
845            name: "my-name".to_string(),
846            resource: "my-resource".to_string(),
847            trace_id: 1,
848            span_id: 2,
849            parent_id: 3,
850            start: 123,
851            duration: 456,
852            error: 1,
853            meta: HashMap::from([
854                ("baggage".to_string(), "item".to_string()),
855                ("elasticsearch.version".to_string(), "7.0".to_string()),
856                (
857                    "_dd.sampling_rate_whatever".to_string(),
858                    "value whatever".to_string(),
859                ),
860            ]),
861            metrics: HashMap::from([("X".to_string(), 1.2)]),
862            meta_struct: HashMap::default(),
863            r#type: "sql".to_string(),
864            span_links: vec![],
865            span_events: vec![],
866        };
867        assert_eq!(span, test_span);
868    }
869
870    #[tokio::test]
871    #[cfg_attr(miri, ignore)]
872    async fn test_get_traces_from_request_body() {
873        let pairs = vec![
874            (
875                json!([{
876                    "service": "test-service",
877                    "name": "test-service-name",
878                    "resource": "test-service-resource",
879                    "trace_id": 111,
880                    "span_id": 222,
881                    "parent_id": 333,
882                    "start": 1,
883                    "duration": 5,
884                    "error": 0,
885                    "meta": {},
886                    "metrics": {},
887                }]),
888                vec![vec![pb::Span {
889                    service: "test-service".to_string(),
890                    name: "test-service-name".to_string(),
891                    resource: "test-service-resource".to_string(),
892                    trace_id: 111,
893                    span_id: 222,
894                    parent_id: 333,
895                    start: 1,
896                    duration: 5,
897                    error: 0,
898                    meta: HashMap::new(),
899                    metrics: HashMap::new(),
900                    meta_struct: HashMap::new(),
901                    r#type: "".to_string(),
902                    span_links: vec![],
903                    span_events: vec![],
904                }]],
905            ),
906            (
907                json!([{
908                    "name": "test-service-name",
909                    "resource": "test-service-resource",
910                    "trace_id": 111,
911                    "span_id": 222,
912                    "start": 1,
913                    "duration": 5,
914                    "meta": {},
915                }]),
916                vec![vec![pb::Span {
917                    service: "".to_string(),
918                    name: "test-service-name".to_string(),
919                    resource: "test-service-resource".to_string(),
920                    trace_id: 111,
921                    span_id: 222,
922                    parent_id: 0,
923                    start: 1,
924                    duration: 5,
925                    error: 0,
926                    meta: HashMap::new(),
927                    metrics: HashMap::new(),
928                    meta_struct: HashMap::new(),
929                    r#type: "".to_string(),
930                    span_links: vec![],
931                    span_events: vec![],
932                }]],
933            ),
934        ];
935
936        for (trace_input, output) in pairs {
937            let bytes = rmp_serde::to_vec(&vec![&trace_input]).unwrap();
938            let request = Request::builder()
939                .body(http_common::Body::from(bytes))
940                .unwrap();
941            let res = get_traces_from_request_body(request.into_body()).await;
942            assert!(res.is_ok());
943            assert_eq!(res.unwrap().1, output);
944        }
945    }
946
947    #[tokio::test]
948    #[cfg_attr(miri, ignore)]
949    async fn test_get_traces_from_request_body_with_span_links() {
950        let trace_input = json!([[{
951            "service": "test-service",
952            "name": "test-name",
953            "resource": "test-resource",
954            "trace_id": 111,
955            "span_id": 222,
956            "parent_id": 333,
957            "start": 1,
958            "duration": 5,
959            "error": 0,
960            "meta": {},
961            "metrics": {},
962            "span_links": [{
963                "trace_id": 999,
964                "span_id": 888,
965                "trace_id_high": 777,
966                "attributes": {"key": "value"},
967                "tracestate": "vendor=value"
968                // flags field intentionally omitted
969            }]
970        }]]);
971
972        let expected_output = vec![vec![pb::Span {
973            service: "test-service".to_string(),
974            name: "test-name".to_string(),
975            resource: "test-resource".to_string(),
976            trace_id: 111,
977            span_id: 222,
978            parent_id: 333,
979            start: 1,
980            duration: 5,
981            error: 0,
982            meta: HashMap::new(),
983            metrics: HashMap::new(),
984            meta_struct: HashMap::new(),
985            r#type: String::new(),
986            span_links: vec![pb::SpanLink {
987                trace_id: 999,
988                span_id: 888,
989                trace_id_high: 777,
990                attributes: HashMap::from([("key".to_string(), "value".to_string())]),
991                tracestate: "vendor=value".to_string(),
992                flags: 0, // Should default to 0 when omitted
993            }],
994            span_events: vec![],
995        }]];
996
997        let bytes = rmp_serde::to_vec(&trace_input).unwrap();
998        let request = Request::builder()
999            .body(http_common::Body::from(bytes))
1000            .unwrap();
1001
1002        let res = get_traces_from_request_body(request.into_body()).await;
1003        assert!(res.is_ok(), "Failed to deserialize: {res:?}");
1004        assert_eq!(res.unwrap().1, expected_output);
1005    }
1006
1007    #[test]
1008    fn test_get_root_span_index_from_complete_trace() {
1009        let trace = vec![
1010            create_test_span(1234, 12341, 0, 1, false),
1011            create_test_span(1234, 12342, 12341, 1, false),
1012            create_test_span(1234, 12343, 12342, 1, false),
1013        ];
1014
1015        let root_span_index = get_root_span_index(&trace);
1016        assert!(root_span_index.is_ok());
1017        assert_eq!(root_span_index.unwrap(), 0);
1018    }
1019
1020    #[test]
1021    fn test_get_root_span_index_from_partial_trace() {
1022        let trace = vec![
1023            create_test_span(1234, 12342, 12341, 1, false),
1024            create_test_span(1234, 12341, 12340, 1, false), /* this is the root span, it's
1025                                                             * parent is not in the trace */
1026            create_test_span(1234, 12343, 12342, 1, false),
1027        ];
1028
1029        let root_span_index = get_root_span_index(&trace);
1030        assert!(root_span_index.is_ok());
1031        assert_eq!(root_span_index.unwrap(), 1);
1032    }
1033
1034    #[test]
1035    fn test_set_serverless_root_span_tags_azure_function() {
1036        let mut span = create_test_span(1234, 12342, 12341, 1, false);
1037        set_serverless_root_span_tags(
1038            &mut span,
1039            Some("test_function".to_string()),
1040            &EnvironmentType::AzureFunction,
1041        );
1042        assert_eq!(
1043            span.meta,
1044            HashMap::from([
1045                (
1046                    "runtime-id".to_string(),
1047                    "test-runtime-id-value".to_string()
1048                ),
1049                ("_dd.origin".to_string(), "azurefunction".to_string()),
1050                ("origin".to_string(), "azurefunction".to_string()),
1051                ("functionname".to_string(), "test_function".to_string()),
1052                ("env".to_string(), "test-env".to_string()),
1053                ("service".to_string(), "test-service".to_string())
1054            ]),
1055        );
1056    }
1057
1058    #[test]
1059    fn test_set_serverless_root_span_tags_cloud_function() {
1060        let mut span = create_test_span(1234, 12342, 12341, 1, false);
1061        set_serverless_root_span_tags(
1062            &mut span,
1063            Some("test_function".to_string()),
1064            &EnvironmentType::CloudFunction,
1065        );
1066        assert_eq!(
1067            span.meta,
1068            HashMap::from([
1069                (
1070                    "runtime-id".to_string(),
1071                    "test-runtime-id-value".to_string()
1072                ),
1073                ("_dd.origin".to_string(), "cloudfunction".to_string()),
1074                ("origin".to_string(), "cloudfunction".to_string()),
1075                ("functionname".to_string(), "test_function".to_string()),
1076                ("env".to_string(), "test-env".to_string()),
1077                ("service".to_string(), "test-service".to_string())
1078            ]),
1079        );
1080    }
1081
1082    #[test]
1083    fn test_has_top_level() {
1084        let top_level_span = create_test_span(123, 1234, 12, 1, true);
1085        let not_top_level_span = create_test_span(123, 1234, 12, 1, false);
1086        assert!(has_top_level(&top_level_span));
1087        assert!(!has_top_level(&not_top_level_span));
1088    }
1089
1090    #[test]
1091    fn test_is_measured() {
1092        let mut measured_span = create_test_span(123, 1234, 12, 1, true);
1093        measured_span.metrics.insert(MEASURED_KEY.into(), 1.0);
1094        let not_measured_span = create_test_span(123, 1234, 12, 1, true);
1095        assert!(is_measured(&measured_span));
1096        assert!(!is_measured(&not_measured_span));
1097    }
1098
1099    #[test]
1100    fn test_compute_top_level() {
1101        let mut span_with_different_service = create_test_span(123, 5, 2, 1, false);
1102        span_with_different_service.service = "another_service".into();
1103        let mut trace = vec![
1104            // Root span, should be marked as top-level
1105            create_test_span(123, 1, 0, 1, false),
1106            // Should not be marked as top-level
1107            create_test_span(123, 2, 1, 1, false),
1108            // No parent in local trace, should be marked as
1109            // top-level
1110            create_test_span(123, 4, 3, 1, false),
1111            // Parent belongs to another service, should be marked
1112            // as top-level
1113            span_with_different_service,
1114        ];
1115
1116        compute_top_level_span(trace.as_mut_slice());
1117
1118        let spans_marked_as_top_level: Vec<u64> = trace
1119            .iter()
1120            .filter_map(|span| {
1121                if has_top_level(span) {
1122                    Some(span.span_id)
1123                } else {
1124                    None
1125                }
1126            })
1127            .collect();
1128        assert_eq!(spans_marked_as_top_level, [1, 4, 5])
1129    }
1130
1131    #[test]
1132    fn test_collect_trace_chunks_v05() {
1133        let chunk = vec![create_test_no_alloc_span(123, 456, 789, 1, true)];
1134
1135        let collection = collect_trace_chunks(vec![chunk], true).unwrap();
1136
1137        let (dict, traces) = match collection {
1138            TraceChunks::V05(payload) => payload,
1139            _ => panic!("Unexpected type"),
1140        };
1141
1142        assert_eq!(dict.len(), 16);
1143
1144        let span = &traces[0][0];
1145        assert_eq!(span.service, 1);
1146        assert_eq!(span.name, 2);
1147        assert_eq!(span.resource, 3);
1148        assert_eq!(span.trace_id, 123);
1149        assert_eq!(span.span_id, 456);
1150        assert_eq!(span.parent_id, 789);
1151        assert_eq!(span.start, 1);
1152        assert_eq!(span.error, 0);
1153        assert_eq!(span.error, 0);
1154        assert_eq!(span.r#type, 15);
1155        assert_eq!(
1156            *span
1157                .meta
1158                .get(&find_index_in_dict(&dict, "service").unwrap())
1159                .unwrap(),
1160            find_index_in_dict(&dict, "test-service").unwrap()
1161        );
1162        assert_eq!(
1163            *span
1164                .meta
1165                .get(&find_index_in_dict(&dict, "env").unwrap())
1166                .unwrap(),
1167            find_index_in_dict(&dict, "test-env").unwrap()
1168        );
1169        assert_eq!(
1170            *span
1171                .meta
1172                .get(&find_index_in_dict(&dict, "runtime-id").unwrap())
1173                .unwrap(),
1174            find_index_in_dict(&dict, "test-runtime-id-value").unwrap()
1175        );
1176        assert_eq!(
1177            *span
1178                .meta
1179                .get(&find_index_in_dict(&dict, "_dd.origin").unwrap())
1180                .unwrap(),
1181            find_index_in_dict(&dict, "cloudfunction").unwrap()
1182        );
1183        assert_eq!(
1184            *span
1185                .meta
1186                .get(&find_index_in_dict(&dict, "origin").unwrap())
1187                .unwrap(),
1188            find_index_in_dict(&dict, "cloudfunction").unwrap()
1189        );
1190        assert_eq!(
1191            *span
1192                .meta
1193                .get(&find_index_in_dict(&dict, "functionname").unwrap())
1194                .unwrap(),
1195            find_index_in_dict(&dict, "dummy_function_name").unwrap()
1196        );
1197        assert_eq!(
1198            *span
1199                .metrics
1200                .get(&find_index_in_dict(&dict, "_top_level").unwrap())
1201                .unwrap(),
1202            1.0
1203        );
1204    }
1205
1206    #[test]
1207    fn test_rmp_serde_deserialize_meta_with_null_values() {
1208        // Create a JSON representation with null value in meta
1209        let span_json = json!({
1210            "service": "test-service",
1211            "name": "test_name",
1212            "resource": "test-resource",
1213            "trace_id": 1_u64,
1214            "span_id": 2_u64,
1215            "parent_id": 0_u64,
1216            "start": 0_i64,
1217            "duration": 5_i64,
1218            "error": 0_i32,
1219            "meta": {
1220                "service": "test-service",
1221                "env": "test-env",
1222                "runtime-id": "test-runtime-id-value",
1223                "problematic_key": null  // Ensure this null value does not cause an error
1224            },
1225            "metrics": {},
1226            "type": "",
1227            "meta_struct": {},
1228            "span_links": [],
1229            "span_events": []
1230        });
1231
1232        let traces_json = vec![vec![span_json]];
1233        let encoded_data = rmp_serde::to_vec(&traces_json).unwrap();
1234        let traces: Vec<Vec<pb::Span>> = rmp_serde::from_read(&encoded_data[..])
1235            .expect("Failed to deserialize traces with null values in meta");
1236
1237        assert_eq!(1, traces.len());
1238        assert_eq!(1, traces[0].len());
1239        let decoded_span = &traces[0][0];
1240
1241        assert_eq!("test-service", decoded_span.service);
1242        assert_eq!("test_name", decoded_span.name);
1243        assert_eq!("test-resource", decoded_span.resource);
1244        assert_eq!("test-service", decoded_span.meta.get("service").unwrap());
1245        assert_eq!("test-env", decoded_span.meta.get("env").unwrap());
1246        assert_eq!(
1247            "test-runtime-id-value",
1248            decoded_span.meta.get("runtime-id").unwrap()
1249        );
1250        // Assert that the null value was filtered out (key not present in map)
1251        assert!(
1252            !decoded_span.meta.contains_key("problematic_key"),
1253            "Null value should be skipped, but key was present"
1254        );
1255    }
1256
1257    #[test]
1258    fn test_enrich_span_with_azure_function_metadata_adds_tags_for_non_apim() {
1259        let mut span = create_test_span(1234, 12342, 12341, 1, false);
1260        span.name = "azure.function".to_string();
1261
1262        enrich_span_with_azure_function_metadata(&mut span);
1263
1264        // If AAS_METADATA_FUNCTION is available, verify aas.* tags were added
1265        // If not available (most test environments), this is a no-op
1266        // This test primarily ensures the function doesn't skip non-apim spans
1267        if azure_app_services::AAS_METADATA_FUNCTION.is_some() {
1268            assert!(span.meta.contains_key("aas.resource.id"));
1269            assert!(span.meta.contains_key("aas.environment.instance_id"));
1270            assert!(span.meta.contains_key("aas.environment.instance_name"));
1271            assert!(span.meta.contains_key("aas.subscription.id"));
1272            assert!(span.meta.contains_key("aas.environment.os"));
1273            assert!(span.meta.contains_key("aas.environment.runtime"));
1274            assert!(span.meta.contains_key("aas.environment.runtime_version"));
1275            assert!(span.meta.contains_key("aas.environment.function_runtime"));
1276            assert!(span.meta.contains_key("aas.resource.group"));
1277            assert!(span.meta.contains_key("aas.site.name"));
1278            assert!(span.meta.contains_key("aas.site.kind"));
1279            assert!(span.meta.contains_key("aas.site.type"));
1280        }
1281    }
1282
1283    #[test]
1284    fn test_enrich_span_with_azure_function_metadata_skips_azure_apim() {
1285        let mut span = create_test_span(1234, 12342, 12341, 1, false);
1286        span.name = "azure.apim".to_string();
1287
1288        enrich_span_with_azure_function_metadata(&mut span);
1289
1290        // Verify no aas.* tags were added
1291        assert!(!span.meta.contains_key("aas.resource.id"));
1292        assert!(!span.meta.contains_key("aas.environment.instance_id"));
1293        assert!(!span.meta.contains_key("aas.environment.instance_name"));
1294        assert!(!span.meta.contains_key("aas.subscription.id"));
1295        assert!(!span.meta.contains_key("aas.environment.os"));
1296        assert!(!span.meta.contains_key("aas.environment.runtime"));
1297        assert!(!span.meta.contains_key("aas.environment.runtime_version"));
1298        assert!(!span.meta.contains_key("aas.environment.function_runtime"));
1299        assert!(!span.meta.contains_key("aas.resource.group"));
1300        assert!(!span.meta.contains_key("aas.site.name"));
1301        assert!(!span.meta.contains_key("aas.site.kind"));
1302        assert!(!span.meta.contains_key("aas.site.type"));
1303    }
1304
1305    #[test]
1306    fn test_collect_pb_trace_chunks_searches_multiple_root_spans_for_fields() {
1307        // First trace root span has no fields. Second trace root span has all fields.
1308        // The second root span should populate all fields.
1309        let mut first_root_span = create_test_span(1, 1, 0, 1, true);
1310        first_root_span.meta.remove("env");
1311        first_root_span.meta.remove("runtime-id");
1312
1313        let mut second_root_span = create_test_span(2, 3, 0, 1, true);
1314        second_root_span
1315            .meta
1316            .insert("version".to_string(), "1.2.3".to_string());
1317        second_root_span
1318            .meta
1319            .insert("env".to_string(), "prod".to_string());
1320        second_root_span
1321            .meta
1322            .insert("_dd.hostname".to_string(), "my-host".to_string());
1323        second_root_span
1324            .meta
1325            .insert("runtime-id".to_string(), "123".to_string());
1326
1327        let result = collect_pb_trace_chunks(
1328            vec![vec![first_root_span], vec![second_root_span]],
1329            &TracerHeaderTags::default(),
1330            &mut tracer_payload::DefaultTraceChunkProcessor,
1331            true,
1332        )
1333        .unwrap();
1334
1335        let TracerPayloadCollection::V07(payloads) = result else {
1336            panic!("expected TracerPayloadCollection::V07");
1337        };
1338        assert_eq!(payloads[0].app_version, "1.2.3");
1339        assert_eq!(payloads[0].env, "prod");
1340        assert_eq!(payloads[0].hostname, "my-host");
1341        assert_eq!(payloads[0].runtime_id, "123");
1342    }
1343
1344    #[test]
1345    fn test_collect_pb_trace_chunks_searches_non_root_spans_for_fields() {
1346        // Root span has no fields. Child span has all fields.
1347        // The child span should populate all fields.
1348        let mut root_span = create_test_span(1, 1, 0, 1, true);
1349        root_span.meta.remove("env");
1350        root_span.meta.remove("runtime-id");
1351        let mut child_span = create_test_span(1, 2, 1, 1, false);
1352        child_span
1353            .meta
1354            .insert("version".to_string(), "1.2.3".to_string());
1355        child_span
1356            .meta
1357            .insert("env".to_string(), "prod".to_string());
1358        child_span
1359            .meta
1360            .insert("_dd.hostname".to_string(), "my-host".to_string());
1361        child_span
1362            .meta
1363            .insert("runtime-id".to_string(), "123".to_string());
1364
1365        let result = collect_pb_trace_chunks(
1366            vec![vec![root_span, child_span]],
1367            &TracerHeaderTags::default(),
1368            &mut tracer_payload::DefaultTraceChunkProcessor,
1369            true,
1370        )
1371        .unwrap();
1372
1373        let TracerPayloadCollection::V07(payloads) = result else {
1374            panic!("expected TracerPayloadCollection::V07");
1375        };
1376        assert_eq!(payloads[0].app_version, "1.2.3");
1377        assert_eq!(payloads[0].env, "prod");
1378        assert_eq!(payloads[0].hostname, "my-host");
1379        assert_eq!(payloads[0].runtime_id, "123");
1380    }
1381
1382    #[test]
1383    fn test_collect_pb_trace_chunks_root_span_takes_priority_over_child() {
1384        // Root span has all fields. Child has different values for all fields.
1385        // The root span should populate all fields.
1386        let mut root_span = create_test_span(1, 1, 0, 1, true);
1387        root_span
1388            .meta
1389            .insert("version".to_string(), "root-version".to_string());
1390        root_span
1391            .meta
1392            .insert("env".to_string(), "root-env".to_string());
1393        root_span
1394            .meta
1395            .insert("_dd.hostname".to_string(), "root-host".to_string());
1396        root_span
1397            .meta
1398            .insert("runtime-id".to_string(), "root-runtime-id".to_string());
1399
1400        let mut child_span = create_test_span(1, 2, 1, 1, false);
1401        child_span
1402            .meta
1403            .insert("version".to_string(), "child-version".to_string());
1404        child_span
1405            .meta
1406            .insert("env".to_string(), "child-env".to_string());
1407        child_span
1408            .meta
1409            .insert("_dd.hostname".to_string(), "child-host".to_string());
1410        child_span
1411            .meta
1412            .insert("runtime-id".to_string(), "child-runtime-id".to_string());
1413
1414        let result = collect_pb_trace_chunks(
1415            vec![vec![root_span, child_span]],
1416            &TracerHeaderTags::default(),
1417            &mut tracer_payload::DefaultTraceChunkProcessor,
1418            true,
1419        )
1420        .unwrap();
1421
1422        let TracerPayloadCollection::V07(payloads) = result else {
1423            panic!("expected TracerPayloadCollection::V07");
1424        };
1425        assert_eq!(payloads[0].app_version, "root-version");
1426        assert_eq!(payloads[0].env, "root-env");
1427        assert_eq!(payloads[0].hostname, "root-host");
1428        assert_eq!(payloads[0].runtime_id, "root-runtime-id");
1429    }
1430
1431    #[test]
1432    fn test_collect_pb_trace_chunks_skips_empty_root_span_value() {
1433        // Root span has empty values for all fields. Child span has non-empty values.
1434        // The child span should populate all fields.
1435        let mut root_span = create_test_span(1, 1, 0, 1, true);
1436        root_span.meta.insert("version".to_string(), "".to_string());
1437        root_span.meta.insert("env".to_string(), "".to_string());
1438        root_span
1439            .meta
1440            .insert("_dd.hostname".to_string(), "".to_string());
1441        root_span
1442            .meta
1443            .insert("runtime-id".to_string(), "".to_string());
1444
1445        let mut child_span = create_test_span(1, 2, 1, 1, false);
1446        child_span
1447            .meta
1448            .insert("version".to_string(), "1.2.3".to_string());
1449        child_span
1450            .meta
1451            .insert("env".to_string(), "prod".to_string());
1452        child_span
1453            .meta
1454            .insert("_dd.hostname".to_string(), "my-host".to_string());
1455        child_span
1456            .meta
1457            .insert("runtime-id".to_string(), "123".to_string());
1458
1459        let result = collect_pb_trace_chunks(
1460            vec![vec![root_span, child_span]],
1461            &TracerHeaderTags::default(),
1462            &mut tracer_payload::DefaultTraceChunkProcessor,
1463            true,
1464        )
1465        .unwrap();
1466
1467        let TracerPayloadCollection::V07(payloads) = result else {
1468            panic!("expected TracerPayloadCollection::V07");
1469        };
1470        assert_eq!(payloads[0].app_version, "1.2.3");
1471        assert_eq!(payloads[0].env, "prod");
1472        assert_eq!(payloads[0].hostname, "my-host");
1473        assert_eq!(payloads[0].runtime_id, "123");
1474    }
1475
1476    #[test]
1477    fn test_collect_pb_trace_chunks_normalizes_env() {
1478        let mut root = create_test_span(1, 1, 0, 1, true);
1479        root.meta
1480            .insert("env".to_string(), "PRODUCTION".to_string());
1481
1482        let result = collect_pb_trace_chunks(
1483            vec![vec![root]],
1484            &TracerHeaderTags::default(),
1485            &mut tracer_payload::DefaultTraceChunkProcessor,
1486            true,
1487        )
1488        .unwrap();
1489
1490        let TracerPayloadCollection::V07(payloads) = result else {
1491            panic!("expected TracerPayloadCollection::V07");
1492        };
1493        assert_eq!(payloads[0].env, "production");
1494    }
1495
1496    #[test]
1497    fn test_collect_pb_trace_chunks_skips_env_empty_after_normalization() {
1498        // First root span has an env that normalizes to empty (all invalid characters).
1499        // Second root span has an env should populate env fields.
1500        let mut first_root_span = create_test_span(1, 1, 0, 1, true);
1501        first_root_span
1502            .meta
1503            .insert("env".to_string(), "!!!".to_string());
1504
1505        let mut second_root_span = create_test_span(2, 3, 0, 1, true);
1506        second_root_span
1507            .meta
1508            .insert("env".to_string(), "prod".to_string());
1509
1510        let result = collect_pb_trace_chunks(
1511            vec![vec![first_root_span], vec![second_root_span]],
1512            &TracerHeaderTags::default(),
1513            &mut tracer_payload::DefaultTraceChunkProcessor,
1514            true,
1515        )
1516        .unwrap();
1517
1518        let TracerPayloadCollection::V07(payloads) = result else {
1519            panic!("expected TracerPayloadCollection::V07");
1520        };
1521        assert_eq!(payloads[0].env, "prod");
1522    }
1523
1524    #[test]
1525    fn test_search_trace_for_field_skips_span_with_same_id_as_root() {
1526        // A span with the same span_id as root is treated as the root and skipped
1527        // in the child span search. Only the root spans own meta is checked for it.
1528        let mut root = create_test_span(1, 1, 0, 1, true);
1529        root.meta.remove("version");
1530
1531        // This span shares the same span_id as the root span, it should be skipped.
1532        let mut duplicate = create_test_span(1, 1, 0, 1, false);
1533        duplicate
1534            .meta
1535            .insert("version".to_string(), "should-not-appear".to_string());
1536
1537        let trace = vec![root.clone(), duplicate];
1538        assert_eq!(search_trace_for_field(&root, &trace, "version"), None);
1539    }
1540}