Skip to main content

libdd_trace_utils/
trace_utils.rs

1// Copyright 2023-Present Datadog, Inc. https://www.datadoghq.com/
2// SPDX-License-Identifier: Apache-2.0
3
4pub use crate::send_data::send_data_result::SendDataResult;
5pub use crate::send_data::SendData;
6use crate::span::v05::dict::SharedDict;
7use crate::span::{v05, TraceData};
8pub use crate::tracer_header_tags::TracerHeaderTags;
9use crate::tracer_payload::TracerPayloadCollection;
10use crate::tracer_payload::{self, TraceChunks, TraceEncoding};
11use anyhow::anyhow;
12use bytes::buf::Reader;
13use bytes::Buf;
14use http_body_util::BodyExt;
15use libdd_common::azure_app_services;
16use libdd_trace_normalization::normalizer;
17use libdd_trace_protobuf::pb;
18use rmp::decode::read_array_len;
19use rmpv::decode::read_value;
20use rmpv::{Integer, Value};
21use std::cmp::Ordering;
22use std::collections::{HashMap, HashSet};
23use std::env;
24use tracing::{debug, error};
25
26/// The maximum payload size for a single request that can be sent to the trace agent. Payloads
27/// larger than this size will be dropped and the agent will return a 413 error if
28/// `datadog-send-real-http-status` is set.
29pub const MAX_PAYLOAD_SIZE: usize = 25 * 1024 * 1024;
30/// Span metric the mini agent must set for the backend to recognize top level span
31const TOP_LEVEL_KEY: &str = "_top_level";
32/// Span metric the tracer sets to denote a top level span
33const TRACER_TOP_LEVEL_KEY: &str = "_dd.top_level";
34const MEASURED_KEY: &str = "_dd.measured";
35const PARTIAL_VERSION_KEY: &str = "_dd.partial_version";
36const MAX_STRING_DICT_SIZE: u32 = 25_000_000;
37const SPAN_ELEMENT_COUNT: usize = 12;
38
39/// First value of returned tuple is the payload size
40pub async fn get_traces_from_request_body<B>(body: B) -> anyhow::Result<(usize, Vec<Vec<pb::Span>>)>
41where
42    B: http_body::Body,
43    B::Error: std::error::Error + Send + Sync + 'static,
44{
45    let buffer = body.collect().await?.aggregate();
46    let size = buffer.remaining();
47
48    let traces: Vec<Vec<pb::Span>> = match rmp_serde::from_read(buffer.reader()) {
49        Ok(res) => res,
50        Err(err) => {
51            anyhow::bail!("Error deserializing trace from request body: {err}")
52        }
53    };
54
55    Ok((size, traces))
56}
57
58#[inline]
59fn get_v05_strings_dict(reader: &mut Reader<impl Buf>) -> anyhow::Result<Vec<String>> {
60    let dict_size =
61        read_array_len(reader).map_err(|err| anyhow!("Error reading dict size: {err}"))?;
62    if dict_size > MAX_STRING_DICT_SIZE {
63        anyhow::bail!(
64            "Error deserializing strings dictionary. Dict size is too large: {dict_size}"
65        );
66    }
67    let mut dict: Vec<String> = Vec::with_capacity(dict_size.try_into()?);
68    for _ in 0..dict_size {
69        match read_value(reader)? {
70            Value::String(s) => {
71                let parsed_string = s.into_str().ok_or_else(|| anyhow!("Error reading string dict"))?;
72                dict.push(parsed_string);
73            }
74            val => anyhow::bail!("Error deserializing strings dictionary. Value in string dict is not a string: {val}")
75        }
76    }
77    Ok(dict)
78}
79
80#[inline]
81fn get_v05_span(reader: &mut Reader<impl Buf>, dict: &[String]) -> anyhow::Result<pb::Span> {
82    let mut span: pb::Span = Default::default();
83    let span_size = rmp::decode::read_array_len(reader)
84        .map_err(|err| anyhow!("Error reading span size: {err}"))? as usize;
85    if span_size != SPAN_ELEMENT_COUNT {
86        anyhow::bail!("Expected an array of exactly 12 elements in a span, got {span_size}");
87    }
88    // 0 - service
89    span.service = get_v05_string(reader, dict, "service")?;
90    // 1 - name
91    span.name = get_v05_string(reader, dict, "name")?;
92    // 2 - resource
93    span.resource = get_v05_string(reader, dict, "resource")?;
94
95    // 3 - trace_id
96    match read_value(reader)? {
97        Value::Integer(i) => {
98            span.trace_id = i.as_u64().ok_or_else(|| {
99                anyhow!("Error reading span trace_id, value is not an integer: {i}")
100            })?;
101        }
102        val => anyhow::bail!("Error reading span trace_id, value is not an integer: {val}"),
103    };
104    // 4 - span_id
105    match read_value(reader)? {
106        Value::Integer(i) => {
107            span.span_id = i.as_u64().ok_or_else(|| {
108                anyhow!("Error reading span span_id, value is not an integer: {i}")
109            })?;
110        }
111        val => anyhow::bail!("Error reading span span_id, value is not an integer: {val}"),
112    };
113    // 5 - parent_id
114    match read_value(reader)? {
115        Value::Integer(i) => {
116            span.parent_id = i.as_u64().ok_or_else(|| {
117                anyhow!("Error reading span parent_id, value is not an integer: {i}")
118            })?;
119        }
120        val => anyhow::bail!("Error reading span parent_id, value is not an integer: {val}"),
121    };
122    // 6 - start
123    match read_value(reader)? {
124        Value::Integer(i) => {
125            span.start = i
126                .as_i64()
127                .ok_or_else(|| anyhow!("Error reading span start, value is not an integer: {i}"))?;
128        }
129        val => anyhow::bail!("Error reading span start, value is not an integer: {val}"),
130    };
131    // 7 - duration
132    match read_value(reader)? {
133        Value::Integer(i) => {
134            span.duration = i.as_i64().ok_or_else(|| {
135                anyhow!("Error reading span duration, value is not an integer: {i}")
136            })?;
137        }
138        val => anyhow::bail!("Error reading span duration, value is not an integer: {val}"),
139    };
140    // 8 - error
141    match read_value(reader)? {
142        Value::Integer(i) => {
143            span.error = i
144                .as_i64()
145                .ok_or_else(|| anyhow!("Error reading span error, value is not an integer: {i}"))?
146                as i32;
147        }
148        val => anyhow::bail!("Error reading span error, value is not an integer: {val}"),
149    }
150    // 9 - meta
151    match read_value(reader)? {
152        Value::Map(meta) => {
153            for (k, v) in meta.iter() {
154                match k {
155                    Value::Integer(k) => {
156                        match v {
157                            Value::Integer(v) => {
158                                let key = str_from_dict(dict, *k)?;
159                                let val = str_from_dict(dict, *v)?;
160                                span.meta.insert(key, val);
161                            }
162                            _ => anyhow::bail!("Error reading span meta, value is not an integer and can't be looked up in dict: {v}")
163                        }
164                    }
165                    _ => anyhow::bail!("Error reading span meta, key is not an integer and can't be looked up in dict: {k}")
166                }
167            }
168        }
169        val => anyhow::bail!("Error reading span meta, value is not a map: {val}"),
170    }
171    // 10 - metrics
172    match read_value(reader)? {
173        Value::Map(metrics) => {
174            for (k, v) in metrics.iter() {
175                match k {
176                    Value::Integer(k) => {
177                        match v {
178                            Value::Integer(v) => {
179                                let key = str_from_dict(dict, *k)?;
180                                span.metrics.insert(key, v.as_f64().ok_or_else(||anyhow!("Error reading span metrics, value is not an integer: {v}"))?);
181                            }
182                            Value::F64(v) => {
183                                let key = str_from_dict(dict, *k)?;
184                                span.metrics.insert(key, *v);
185                            }
186                            _ => anyhow::bail!(
187                                "Error reading span metrics, value is not a float or integer: {v}"
188                            ),
189                        }
190                    }
191                    _ => anyhow::bail!("Error reading span metrics, key is not an integer: {k}"),
192                }
193            }
194        }
195        val => anyhow::bail!("Error reading span metrics, value is not a map: {val}"),
196    }
197
198    // 11 - type
199    match read_value(reader)? {
200        Value::Integer(s) => span.r#type = str_from_dict(dict, s)?,
201        val => anyhow::bail!("Error reading span type, value is not an integer: {val}"),
202    }
203    Ok(span)
204}
205
206#[inline]
207fn str_from_dict(dict: &[String], id: Integer) -> anyhow::Result<String> {
208    let id = id
209        .as_i64()
210        .ok_or_else(|| anyhow!("Error reading string from dict, id is not an integer: {id}"))?
211        as usize;
212    if id >= dict.len() {
213        anyhow::bail!("Error reading string from dict, id out of bounds: {id}");
214    }
215    Ok(dict[id].to_string())
216}
217
218#[inline]
219fn get_v05_string(
220    reader: &mut Reader<impl Buf>,
221    dict: &[String],
222    field_name: &str,
223) -> anyhow::Result<String> {
224    match read_value(reader)? {
225        Value::Integer(s) => {
226            str_from_dict(dict, s)
227        },
228        val => anyhow::bail!("Error reading {field_name}, value is not an integer and can't be looked up in dict: {val}")
229    }
230}
231
232pub async fn get_v05_traces_from_request_body<B>(
233    body: B,
234) -> anyhow::Result<(usize, Vec<Vec<pb::Span>>)>
235where
236    B: http_body::Body,
237    B::Error: std::error::Error + Send + Sync + 'static,
238{
239    let buffer = body.collect().await?.aggregate();
240    let body_size = buffer.remaining();
241    let mut reader = buffer.reader();
242    let wrapper_size = read_array_len(&mut reader)?;
243    if wrapper_size != 2 {
244        anyhow::bail!("Expected an arrary of exactly 2 elements, got {wrapper_size}");
245    }
246
247    let dict = get_v05_strings_dict(&mut reader)?;
248
249    let traces_size = rmp::decode::read_array_len(&mut reader)?;
250    let mut traces: Vec<Vec<pb::Span>> = Default::default();
251
252    for _ in 0..traces_size {
253        let spans_size = rmp::decode::read_array_len(&mut reader)?;
254        let mut trace: Vec<pb::Span> = Default::default();
255
256        for _ in 0..spans_size {
257            let span = get_v05_span(&mut reader, &dict)?;
258            trace.push(span);
259        }
260        traces.push(trace);
261    }
262    Ok((body_size, traces))
263}
264
265/// Tags extracted from a tracer payload's traces, used to populate top level tracer payload fields.
266#[derive(Default)]
267pub struct TracerPayloadTags {
268    pub env: String,
269    pub app_version: String,
270    pub hostname: String,
271    pub runtime_id: String,
272}
273
274/// Returns the first non-empty value of `field` found in `trace`, searching the root span first
275/// then all other spans.
276fn search_trace_for_field(root: &pb::Span, trace: &[pb::Span], field: &str) -> Option<String> {
277    if let Some(v) = root.meta.get(field) {
278        if !v.is_empty() {
279            return Some(v.clone());
280        }
281    }
282    for span in trace {
283        if span.span_id == root.span_id {
284            continue;
285        }
286        if let Some(v) = span.meta.get(field) {
287            if !v.is_empty() {
288                return Some(v.clone());
289            }
290        }
291    }
292    None
293}
294
295pub(crate) fn construct_trace_chunk(trace: Vec<pb::Span>) -> pb::TraceChunk {
296    pb::TraceChunk {
297        priority: normalizer::SamplerPriority::None as i32,
298        origin: "".to_string(),
299        spans: trace,
300        tags: HashMap::new(),
301        dropped_trace: false,
302    }
303}
304
305pub(crate) fn construct_tracer_payload(
306    chunks: Vec<pb::TraceChunk>,
307    tracer_tags: &TracerHeaderTags,
308    tracer_payload_tags: TracerPayloadTags,
309) -> pb::TracerPayload {
310    pb::TracerPayload {
311        app_version: tracer_payload_tags.app_version,
312        language_name: tracer_tags.lang.to_string(),
313        container_id: tracer_tags.container_id.to_string(),
314        env: tracer_payload_tags.env,
315        runtime_id: tracer_payload_tags.runtime_id,
316        chunks,
317        hostname: tracer_payload_tags.hostname,
318        language_version: tracer_tags.lang_version.to_string(),
319        tags: HashMap::new(),
320        tracer_version: tracer_tags.tracer_version.to_string(),
321    }
322}
323
324pub(crate) fn cmp_send_data_payloads(a: &pb::TracerPayload, b: &pb::TracerPayload) -> Ordering {
325    a.tracer_version
326        .cmp(&b.tracer_version)
327        .then(a.language_version.cmp(&b.language_version))
328        .then(a.language_name.cmp(&b.language_name))
329        .then(a.hostname.cmp(&b.hostname))
330        .then(a.container_id.cmp(&b.container_id))
331        .then(a.runtime_id.cmp(&b.runtime_id))
332        .then(a.env.cmp(&b.env))
333        .then(a.app_version.cmp(&b.app_version))
334}
335
336pub fn coalesce_send_data(mut data: Vec<SendData>) -> Vec<SendData> {
337    // TODO trace payloads with identical data except for chunk could be merged?
338
339    data.sort_unstable_by(|a, b| {
340        a.get_target()
341            .url
342            .to_string()
343            .cmp(&b.get_target().url.to_string())
344            .then(a.get_target().test_token.cmp(&b.get_target().test_token))
345    });
346    data.dedup_by(|a, b| {
347        if a.get_target().url == b.get_target().url
348            && a.get_target().test_token == b.get_target().test_token
349        {
350            // Size is only an approximation. In practice it won't vary much, but be safe here.
351            // We also don't care about the exact maximum size, like two 25 MB or one 50 MB request
352            // has similar results. The primary goal here is avoiding many small requests.
353            // TODO: maybe make the MAX_PAYLOAD_SIZE configurable?
354            if a.size + b.size < MAX_PAYLOAD_SIZE / 2 {
355                // Note: dedup_by drops a, and retains b.
356                b.tracer_payloads.append(&mut a.tracer_payloads);
357                b.size += a.size;
358                return true;
359            }
360        }
361        false
362    });
363    // Merge chunks with common properties. Reduces requests for agentful mode.
364    // And reduces a little bit of data for agentless.
365    for send_data in data.iter_mut() {
366        send_data.tracer_payloads.merge();
367    }
368    data
369}
370
371pub fn get_root_span_index(trace: &[pb::Span]) -> anyhow::Result<usize> {
372    if trace.is_empty() {
373        anyhow::bail!("Cannot find root span index in an empty trace.");
374    }
375
376    // Do a first pass to find if we have an obvious root span (starting from the end) since some
377    // clients put the root span last.
378    for (i, span) in trace.iter().enumerate().rev() {
379        if span.parent_id == 0 {
380            return Ok(i);
381        }
382    }
383
384    let mut span_ids: HashSet<u64> = HashSet::with_capacity(trace.len());
385    for span in trace.iter() {
386        span_ids.insert(span.span_id);
387    }
388
389    let mut root_span_id = None;
390    for (i, span) in trace.iter().enumerate() {
391        // If a span's parent is not in the trace, it is a root
392        if !span_ids.contains(&span.parent_id) {
393            if root_span_id.is_some() {
394                debug!(
395                    trace_id = &trace[0].trace_id,
396                    "trace has multiple root spans"
397                );
398            }
399            root_span_id = Some(i);
400        }
401    }
402    Ok(match root_span_id {
403        Some(i) => i,
404        None => {
405            debug!(
406                trace_id = &trace[0].trace_id,
407                "Could not find the root span for trace"
408            );
409            trace.len() - 1
410        }
411    })
412}
413
414/// Updates all the spans top-level attribute.
415/// A span is considered top-level if:
416///   - it's a root span
417///   - OR its parent is unknown (other part of the code, distributed trace)
418///   - OR its parent belongs to another service (in that case it's a "local root" being the highest
419///     ancestor of other spans belonging to this service and attached to it).
420pub fn compute_top_level_span(trace: &mut [pb::Span]) {
421    let mut span_id_to_service: HashMap<u64, String> = HashMap::new();
422    for span in trace.iter() {
423        span_id_to_service.insert(span.span_id, span.service.clone());
424    }
425    for span in trace.iter_mut() {
426        if span.parent_id == 0 {
427            set_top_level_span(span, true);
428            continue;
429        }
430        match span_id_to_service.get(&span.parent_id) {
431            Some(parent_span_service) => {
432                if !parent_span_service.eq(&span.service) {
433                    // parent is not in the same service
434                    set_top_level_span(span, true)
435                }
436            }
437            None => {
438                // span has no parent in chunk
439                set_top_level_span(span, true)
440            }
441        }
442    }
443}
444
445/// Return true if the span has a top level key set
446pub fn has_top_level(span: &pb::Span) -> bool {
447    span.metrics
448        .get(TRACER_TOP_LEVEL_KEY)
449        .is_some_and(|v| *v == 1.0)
450        || span.metrics.get(TOP_LEVEL_KEY).is_some_and(|v| *v == 1.0)
451}
452
453fn set_top_level_span(span: &mut pb::Span, is_top_level: bool) {
454    if is_top_level {
455        span.metrics.insert(TOP_LEVEL_KEY.to_string(), 1.0);
456    } else {
457        span.metrics.remove(TOP_LEVEL_KEY);
458    }
459}
460
461pub fn set_serverless_root_span_tags(
462    span: &mut pb::Span,
463    app_name: Option<String>,
464    env_type: &EnvironmentType,
465) {
466    let origin_tag = match env_type {
467        EnvironmentType::CloudFunction => "cloudfunction",
468        EnvironmentType::AzureFunction => "azurefunction",
469        EnvironmentType::AzureSpringApp => "azurespringapp",
470        EnvironmentType::LambdaFunction => "lambda", // historical reasons
471    };
472    span.meta
473        .insert("_dd.origin".to_string(), origin_tag.to_string());
474    span.meta
475        .insert("origin".to_string(), origin_tag.to_string());
476
477    if let Some(function_name) = app_name {
478        match env_type {
479            EnvironmentType::CloudFunction
480            | EnvironmentType::AzureFunction
481            | EnvironmentType::LambdaFunction => {
482                span.meta.insert("functionname".to_string(), function_name);
483            }
484            _ => {}
485        }
486    }
487}
488
489fn update_tracer_top_level(span: &mut pb::Span) {
490    if span.metrics.contains_key(TRACER_TOP_LEVEL_KEY) {
491        span.metrics.insert(TOP_LEVEL_KEY.to_string(), 1.0);
492    }
493}
494
495#[derive(Clone, Debug, Eq, PartialEq)]
496pub enum EnvironmentType {
497    CloudFunction,
498    AzureFunction,
499    AzureSpringApp,
500    LambdaFunction,
501}
502
503#[derive(Clone, Debug, Eq, PartialEq)]
504pub struct MiniAgentMetadata {
505    pub azure_spring_app_hostname: Option<String>,
506    pub azure_spring_app_name: Option<String>,
507    pub gcp_project_id: Option<String>,
508    pub gcp_region: Option<String>,
509    pub version: Option<String>,
510}
511
512impl Default for MiniAgentMetadata {
513    fn default() -> Self {
514        MiniAgentMetadata {
515            azure_spring_app_hostname: Default::default(),
516            azure_spring_app_name: Default::default(),
517            gcp_project_id: Default::default(),
518            gcp_region: Default::default(),
519            version: env::var("DD_SERVERLESS_COMPAT_VERSION").ok(),
520        }
521    }
522}
523
524pub fn enrich_span_with_mini_agent_metadata(
525    span: &mut pb::Span,
526    mini_agent_metadata: &MiniAgentMetadata,
527) {
528    if let Some(azure_spring_app_hostname) = &mini_agent_metadata.azure_spring_app_hostname {
529        span.meta.insert(
530            "asa.hostname".to_string(),
531            azure_spring_app_hostname.to_string(),
532        );
533    }
534    if let Some(azure_spring_app_name) = &mini_agent_metadata.azure_spring_app_name {
535        span.meta
536            .insert("asa.name".to_string(), azure_spring_app_name.to_string());
537    }
538    if let Some(serverless_compat_version) = &mini_agent_metadata.version {
539        span.meta.insert(
540            "_dd.serverless_compat_version".to_string(),
541            serverless_compat_version.to_string(),
542        );
543    }
544}
545
546pub fn enrich_span_with_google_cloud_function_metadata(
547    span: &mut pb::Span,
548    mini_agent_metadata: &MiniAgentMetadata,
549    function: Option<String>,
550) {
551    #[allow(clippy::todo)]
552    let Some(region) = &mini_agent_metadata.gcp_region
553    else {
554        todo!()
555    };
556    #[allow(clippy::todo)]
557    let Some(project) = &mini_agent_metadata.gcp_project_id
558    else {
559        todo!()
560    };
561
562    if let Some(function) = function {
563        if !region.is_empty() && !project.is_empty() {
564            let resource_name = format!(
565                "projects/{}/locations/{}/functions/{}",
566                project, region, function
567            );
568
569            span.meta
570                .insert("gcrfx.location".to_string(), region.to_string());
571            span.meta
572                .insert("gcrfx.project_id".to_string(), project.to_string());
573            span.meta
574                .insert("gcrfx.resource_name".to_string(), resource_name.to_string());
575        }
576    }
577}
578
579pub fn enrich_span_with_azure_function_metadata(span: &mut pb::Span) {
580    if span.name == "azure.apim" {
581        return;
582    }
583
584    if let Some(aas_metadata) = &*azure_app_services::AAS_METADATA_FUNCTION {
585        span.meta.extend(
586            aas_metadata
587                .get_function_tags()
588                .map(|(name, value)| (name.to_string(), value.to_string())),
589        );
590    }
591}
592
593pub fn collect_trace_chunks<T: TraceData>(
594    traces: Vec<Vec<crate::span::v04::Span<T>>>,
595    format: TraceEncoding,
596) -> anyhow::Result<TraceChunks<T>> {
597    match format {
598        TraceEncoding::V05 => {
599            let mut shared_dict = SharedDict::default();
600            let mut v05_traces: Vec<Vec<v05::Span>> = Vec::with_capacity(traces.len());
601            for trace in traces {
602                let v05_trace = trace
603                    .into_iter()
604                    .map(|span| v05::from_v04_span(span, &mut shared_dict))
605                    .collect::<anyhow::Result<Vec<_>>>()?;
606                v05_traces.push(v05_trace);
607            }
608            Ok(TraceChunks::V05((shared_dict, v05_traces)))
609        }
610        TraceEncoding::V04 => Ok(TraceChunks::V04(traces)),
611    }
612}
613
614pub fn collect_pb_trace_chunks<T: tracer_payload::TraceChunkProcessor>(
615    mut traces: Vec<Vec<pb::Span>>,
616    tracer_header_tags: &TracerHeaderTags,
617    process_chunk: &mut T,
618    is_agentless: bool,
619) -> anyhow::Result<TracerPayloadCollection> {
620    let mut trace_chunks: Vec<pb::TraceChunk> = Vec::new();
621
622    // We'll skip setting the global metadata and rely on the agent to unpack these
623    let mut tracer_payload_tags = TracerPayloadTags::default();
624
625    for trace in traces.iter_mut() {
626        if is_agentless {
627            if let Err(e) = normalizer::normalize_trace(trace) {
628                error!("Error normalizing trace: {e}");
629            }
630        }
631
632        let mut chunk = construct_trace_chunk(trace.to_vec());
633
634        let root_span_index = match get_root_span_index(trace) {
635            Ok(res) => res,
636            Err(e) => {
637                error!("Error getting the root span index of a trace, skipping. {e}");
638                continue;
639            }
640        };
641
642        if let Err(e) = normalizer::normalize_chunk(&mut chunk, root_span_index) {
643            error!("Error normalizing trace chunk: {e}");
644        }
645
646        for span in chunk.spans.iter_mut() {
647            // TODO: obfuscate & truncate spans
648            if tracer_header_tags.client_computed_top_level {
649                update_tracer_top_level(span);
650            }
651        }
652
653        if !tracer_header_tags.client_computed_top_level {
654            compute_top_level_span(&mut chunk.spans);
655        }
656
657        process_chunk.process(&mut chunk, root_span_index);
658
659        trace_chunks.push(chunk);
660
661        if is_agentless {
662            // Check each field independently so that a later trace can fill in fields missing
663            // from an earlier trace.
664            let root = &trace[root_span_index];
665            if tracer_payload_tags.env.is_empty() {
666                if let Some(mut v) = search_trace_for_field(root, trace, "env") {
667                    // Normalize env tag in case the span it was pulled from was skipped during
668                    // normalization
669                    libdd_trace_normalization::normalize_utils::normalize_tag(&mut v);
670                    if !v.is_empty() {
671                        tracer_payload_tags.env = v;
672                    }
673                }
674            }
675            if tracer_payload_tags.app_version.is_empty() {
676                if let Some(v) = search_trace_for_field(root, trace, "version") {
677                    tracer_payload_tags.app_version = v;
678                }
679            }
680            if tracer_payload_tags.hostname.is_empty() {
681                if let Some(v) = search_trace_for_field(root, trace, "_dd.hostname") {
682                    tracer_payload_tags.hostname = v;
683                }
684            }
685            if tracer_payload_tags.runtime_id.is_empty() {
686                if let Some(v) = search_trace_for_field(root, trace, "runtime-id") {
687                    tracer_payload_tags.runtime_id = v;
688                }
689            }
690        }
691    }
692
693    Ok(TracerPayloadCollection::V07(vec![
694        construct_tracer_payload(trace_chunks, tracer_header_tags, tracer_payload_tags),
695    ]))
696}
697
698/// Returns true if a span should be measured (i.e., it should get trace metrics calculated).
699pub fn is_measured(span: &pb::Span) -> bool {
700    span.metrics.get(MEASURED_KEY).is_some_and(|v| *v == 1.0)
701}
702
703/// Returns true if the span is a partial snapshot.
704/// This kind of spans are partial images of long-running spans.
705/// When incomplete, a partial snapshot has a metric _dd.partial_version which is a positive
706/// integer. The metric usually increases each time a new version of the same span is sent by the
707/// tracer
708pub fn is_partial_snapshot(span: &pb::Span) -> bool {
709    span.metrics
710        .get(PARTIAL_VERSION_KEY)
711        .is_some_and(|v| *v >= 0.0)
712}
713
714#[cfg(test)]
715mod tests {
716    use super::*;
717    use crate::{
718        span::SharedDictBytes,
719        test_utils::{create_test_no_alloc_span, create_test_span},
720    };
721    use http::Request;
722    use libdd_common::{http_common, Endpoint};
723    use serde_json::json;
724
725    fn find_index_in_dict(dict: &SharedDictBytes, value: &str) -> Option<u32> {
726        let idx = dict.iter().position(|e| e.as_str() == value);
727        idx.map(|idx| idx.try_into().unwrap())
728    }
729
730    #[test]
731    fn test_coalescing_does_not_exceed_max_size() {
732        fn dummy() -> SendData {
733            SendData::new(
734                MAX_PAYLOAD_SIZE / 5 + 1,
735                TracerPayloadCollection::V07(vec![pb::TracerPayload {
736                    container_id: "".to_string(),
737                    language_name: "".to_string(),
738                    language_version: "".to_string(),
739                    tracer_version: "".to_string(),
740                    runtime_id: "".to_string(),
741                    chunks: vec![pb::TraceChunk {
742                        priority: 0,
743                        origin: "".to_string(),
744                        spans: vec![],
745                        tags: Default::default(),
746                        dropped_trace: false,
747                    }],
748                    tags: Default::default(),
749                    env: "".to_string(),
750                    hostname: "".to_string(),
751                    app_version: "".to_string(),
752                }]),
753                TracerHeaderTags::default(),
754                &Endpoint::default(),
755            )
756        }
757        let coalesced = coalesce_send_data(vec![dummy(), dummy(), dummy(), dummy(), dummy()]);
758        assert_eq!(
759            5,
760            coalesced
761                .iter()
762                .map(|s| s.tracer_payloads.size())
763                .sum::<usize>()
764        );
765        // assert some chunks are actually coalesced
766        assert!(
767            coalesced
768                .iter()
769                .map(|s| {
770                    if let TracerPayloadCollection::V07(collection) = &s.tracer_payloads {
771                        collection.iter().map(|s| s.chunks.len()).max().unwrap()
772                    } else {
773                        0
774                    }
775                })
776                .max()
777                .unwrap()
778                > 1
779        );
780        assert!(coalesced.len() > 1 && coalesced.len() < 5);
781    }
782
783    #[tokio::test]
784    #[allow(clippy::type_complexity)]
785    #[cfg_attr(all(miri, target_os = "macos"), ignore)]
786    async fn test_get_v05_traces_from_request_body() {
787        let data: (
788            Vec<String>,
789            Vec<
790                Vec<(
791                    u8,
792                    u8,
793                    u8,
794                    u64,
795                    u64,
796                    u64,
797                    i64,
798                    i64,
799                    i32,
800                    HashMap<u8, u8>,
801                    HashMap<u8, f64>,
802                    u8,
803                )>,
804            >,
805        ) = (
806            vec![
807                "baggage".to_string(),
808                "item".to_string(),
809                "elasticsearch.version".to_string(),
810                "7.0".to_string(),
811                "my-name".to_string(),
812                "X".to_string(),
813                "my-service".to_string(),
814                "my-resource".to_string(),
815                "_dd.sampling_rate_whatever".to_string(),
816                "value whatever".to_string(),
817                "sql".to_string(),
818            ],
819            vec![vec![(
820                6,
821                4,
822                7,
823                1,
824                2,
825                3,
826                123,
827                456,
828                1,
829                HashMap::from([(8, 9), (0, 1), (2, 3)]),
830                HashMap::from([(5, 1.2)]),
831                10,
832            )]],
833        );
834        let bytes = rmp_serde::to_vec(&data).unwrap();
835        let res = get_v05_traces_from_request_body(http_common::Body::from(bytes)).await;
836        assert!(res.is_ok());
837        let (_, traces) = res.unwrap();
838        let span = traces[0][0].clone();
839        let test_span = pb::Span {
840            service: "my-service".to_string(),
841            name: "my-name".to_string(),
842            resource: "my-resource".to_string(),
843            trace_id: 1,
844            span_id: 2,
845            parent_id: 3,
846            start: 123,
847            duration: 456,
848            error: 1,
849            meta: HashMap::from([
850                ("baggage".to_string(), "item".to_string()),
851                ("elasticsearch.version".to_string(), "7.0".to_string()),
852                (
853                    "_dd.sampling_rate_whatever".to_string(),
854                    "value whatever".to_string(),
855                ),
856            ]),
857            metrics: HashMap::from([("X".to_string(), 1.2)]),
858            meta_struct: HashMap::default(),
859            r#type: "sql".to_string(),
860            span_links: vec![],
861            span_events: vec![],
862        };
863        assert_eq!(span, test_span);
864    }
865
866    #[tokio::test]
867    #[cfg_attr(miri, ignore)]
868    async fn test_get_traces_from_request_body() {
869        let pairs = vec![
870            (
871                json!([{
872                    "service": "test-service",
873                    "name": "test-service-name",
874                    "resource": "test-service-resource",
875                    "trace_id": 111,
876                    "span_id": 222,
877                    "parent_id": 333,
878                    "start": 1,
879                    "duration": 5,
880                    "error": 0,
881                    "meta": {},
882                    "metrics": {},
883                }]),
884                vec![vec![pb::Span {
885                    service: "test-service".to_string(),
886                    name: "test-service-name".to_string(),
887                    resource: "test-service-resource".to_string(),
888                    trace_id: 111,
889                    span_id: 222,
890                    parent_id: 333,
891                    start: 1,
892                    duration: 5,
893                    error: 0,
894                    meta: HashMap::new(),
895                    metrics: HashMap::new(),
896                    meta_struct: HashMap::new(),
897                    r#type: "".to_string(),
898                    span_links: vec![],
899                    span_events: vec![],
900                }]],
901            ),
902            (
903                json!([{
904                    "name": "test-service-name",
905                    "resource": "test-service-resource",
906                    "trace_id": 111,
907                    "span_id": 222,
908                    "start": 1,
909                    "duration": 5,
910                    "meta": {},
911                }]),
912                vec![vec![pb::Span {
913                    service: "".to_string(),
914                    name: "test-service-name".to_string(),
915                    resource: "test-service-resource".to_string(),
916                    trace_id: 111,
917                    span_id: 222,
918                    parent_id: 0,
919                    start: 1,
920                    duration: 5,
921                    error: 0,
922                    meta: HashMap::new(),
923                    metrics: HashMap::new(),
924                    meta_struct: HashMap::new(),
925                    r#type: "".to_string(),
926                    span_links: vec![],
927                    span_events: vec![],
928                }]],
929            ),
930        ];
931
932        for (trace_input, output) in pairs {
933            let bytes = rmp_serde::to_vec(&vec![&trace_input]).unwrap();
934            let request = Request::builder()
935                .body(http_common::Body::from(bytes))
936                .unwrap();
937            let res = get_traces_from_request_body(request.into_body()).await;
938            assert!(res.is_ok());
939            assert_eq!(res.unwrap().1, output);
940        }
941    }
942
943    #[tokio::test]
944    #[cfg_attr(miri, ignore)]
945    async fn test_get_traces_from_request_body_with_span_links() {
946        let trace_input = json!([[{
947            "service": "test-service",
948            "name": "test-name",
949            "resource": "test-resource",
950            "trace_id": 111,
951            "span_id": 222,
952            "parent_id": 333,
953            "start": 1,
954            "duration": 5,
955            "error": 0,
956            "meta": {},
957            "metrics": {},
958            "span_links": [{
959                "trace_id": 999,
960                "span_id": 888,
961                "trace_id_high": 777,
962                "attributes": {"key": "value"},
963                "tracestate": "vendor=value"
964                // flags field intentionally omitted
965            }]
966        }]]);
967
968        let expected_output = vec![vec![pb::Span {
969            service: "test-service".to_string(),
970            name: "test-name".to_string(),
971            resource: "test-resource".to_string(),
972            trace_id: 111,
973            span_id: 222,
974            parent_id: 333,
975            start: 1,
976            duration: 5,
977            error: 0,
978            meta: HashMap::new(),
979            metrics: HashMap::new(),
980            meta_struct: HashMap::new(),
981            r#type: String::new(),
982            span_links: vec![pb::SpanLink {
983                trace_id: 999,
984                span_id: 888,
985                trace_id_high: 777,
986                attributes: HashMap::from([("key".to_string(), "value".to_string())]),
987                tracestate: "vendor=value".to_string(),
988                flags: 0, // Should default to 0 when omitted
989            }],
990            span_events: vec![],
991        }]];
992
993        let bytes = rmp_serde::to_vec(&trace_input).unwrap();
994        let request = Request::builder()
995            .body(http_common::Body::from(bytes))
996            .unwrap();
997
998        let res = get_traces_from_request_body(request.into_body()).await;
999        assert!(res.is_ok(), "Failed to deserialize: {res:?}");
1000        assert_eq!(res.unwrap().1, expected_output);
1001    }
1002
1003    #[test]
1004    fn test_get_root_span_index_from_complete_trace() {
1005        let trace = vec![
1006            create_test_span(1234, 12341, 0, 1, false),
1007            create_test_span(1234, 12342, 12341, 1, false),
1008            create_test_span(1234, 12343, 12342, 1, false),
1009        ];
1010
1011        let root_span_index = get_root_span_index(&trace);
1012        assert!(root_span_index.is_ok());
1013        assert_eq!(root_span_index.unwrap(), 0);
1014    }
1015
1016    #[test]
1017    fn test_get_root_span_index_from_partial_trace() {
1018        let trace = vec![
1019            create_test_span(1234, 12342, 12341, 1, false),
1020            create_test_span(1234, 12341, 12340, 1, false), /* this is the root span, it's
1021                                                             * parent is not in the trace */
1022            create_test_span(1234, 12343, 12342, 1, false),
1023        ];
1024
1025        let root_span_index = get_root_span_index(&trace);
1026        assert!(root_span_index.is_ok());
1027        assert_eq!(root_span_index.unwrap(), 1);
1028    }
1029
1030    #[test]
1031    fn test_set_serverless_root_span_tags_azure_function() {
1032        let mut span = create_test_span(1234, 12342, 12341, 1, false);
1033        set_serverless_root_span_tags(
1034            &mut span,
1035            Some("test_function".to_string()),
1036            &EnvironmentType::AzureFunction,
1037        );
1038        assert_eq!(
1039            span.meta,
1040            HashMap::from([
1041                (
1042                    "runtime-id".to_string(),
1043                    "test-runtime-id-value".to_string()
1044                ),
1045                ("_dd.origin".to_string(), "azurefunction".to_string()),
1046                ("origin".to_string(), "azurefunction".to_string()),
1047                ("functionname".to_string(), "test_function".to_string()),
1048                ("env".to_string(), "test-env".to_string()),
1049                ("service".to_string(), "test-service".to_string())
1050            ]),
1051        );
1052    }
1053
1054    #[test]
1055    fn test_set_serverless_root_span_tags_cloud_function() {
1056        let mut span = create_test_span(1234, 12342, 12341, 1, false);
1057        set_serverless_root_span_tags(
1058            &mut span,
1059            Some("test_function".to_string()),
1060            &EnvironmentType::CloudFunction,
1061        );
1062        assert_eq!(
1063            span.meta,
1064            HashMap::from([
1065                (
1066                    "runtime-id".to_string(),
1067                    "test-runtime-id-value".to_string()
1068                ),
1069                ("_dd.origin".to_string(), "cloudfunction".to_string()),
1070                ("origin".to_string(), "cloudfunction".to_string()),
1071                ("functionname".to_string(), "test_function".to_string()),
1072                ("env".to_string(), "test-env".to_string()),
1073                ("service".to_string(), "test-service".to_string())
1074            ]),
1075        );
1076    }
1077
1078    #[test]
1079    fn test_has_top_level() {
1080        let top_level_span = create_test_span(123, 1234, 12, 1, true);
1081        let not_top_level_span = create_test_span(123, 1234, 12, 1, false);
1082        assert!(has_top_level(&top_level_span));
1083        assert!(!has_top_level(&not_top_level_span));
1084    }
1085
1086    #[test]
1087    fn test_is_measured() {
1088        let mut measured_span = create_test_span(123, 1234, 12, 1, true);
1089        measured_span.metrics.insert(MEASURED_KEY.into(), 1.0);
1090        let not_measured_span = create_test_span(123, 1234, 12, 1, true);
1091        assert!(is_measured(&measured_span));
1092        assert!(!is_measured(&not_measured_span));
1093    }
1094
1095    #[test]
1096    fn test_compute_top_level() {
1097        let mut span_with_different_service = create_test_span(123, 5, 2, 1, false);
1098        span_with_different_service.service = "another_service".into();
1099        let mut trace = vec![
1100            // Root span, should be marked as top-level
1101            create_test_span(123, 1, 0, 1, false),
1102            // Should not be marked as top-level
1103            create_test_span(123, 2, 1, 1, false),
1104            // No parent in local trace, should be marked as
1105            // top-level
1106            create_test_span(123, 4, 3, 1, false),
1107            // Parent belongs to another service, should be marked
1108            // as top-level
1109            span_with_different_service,
1110        ];
1111
1112        compute_top_level_span(trace.as_mut_slice());
1113
1114        let spans_marked_as_top_level: Vec<u64> = trace
1115            .iter()
1116            .filter_map(|span| {
1117                if has_top_level(span) {
1118                    Some(span.span_id)
1119                } else {
1120                    None
1121                }
1122            })
1123            .collect();
1124        assert_eq!(spans_marked_as_top_level, [1, 4, 5])
1125    }
1126
1127    #[test]
1128    fn test_collect_trace_chunks_v05() {
1129        let chunk = vec![create_test_no_alloc_span(123, 456, 789, 1, true)];
1130
1131        let collection = collect_trace_chunks(vec![chunk], TraceEncoding::V05).unwrap();
1132
1133        let (dict, traces) = match collection {
1134            TraceChunks::V05(payload) => payload,
1135            _ => panic!("Unexpected type"),
1136        };
1137
1138        assert_eq!(dict.len(), 16);
1139
1140        let span = &traces[0][0];
1141        assert_eq!(span.service, 1);
1142        assert_eq!(span.name, 2);
1143        assert_eq!(span.resource, 3);
1144        assert_eq!(span.trace_id, 123);
1145        assert_eq!(span.span_id, 456);
1146        assert_eq!(span.parent_id, 789);
1147        assert_eq!(span.start, 1);
1148        assert_eq!(span.error, 0);
1149        assert_eq!(span.error, 0);
1150        assert_eq!(span.r#type, 15);
1151        assert_eq!(
1152            *span
1153                .meta
1154                .get(&find_index_in_dict(&dict, "service").unwrap())
1155                .unwrap(),
1156            find_index_in_dict(&dict, "test-service").unwrap()
1157        );
1158        assert_eq!(
1159            *span
1160                .meta
1161                .get(&find_index_in_dict(&dict, "env").unwrap())
1162                .unwrap(),
1163            find_index_in_dict(&dict, "test-env").unwrap()
1164        );
1165        assert_eq!(
1166            *span
1167                .meta
1168                .get(&find_index_in_dict(&dict, "runtime-id").unwrap())
1169                .unwrap(),
1170            find_index_in_dict(&dict, "test-runtime-id-value").unwrap()
1171        );
1172        assert_eq!(
1173            *span
1174                .meta
1175                .get(&find_index_in_dict(&dict, "_dd.origin").unwrap())
1176                .unwrap(),
1177            find_index_in_dict(&dict, "cloudfunction").unwrap()
1178        );
1179        assert_eq!(
1180            *span
1181                .meta
1182                .get(&find_index_in_dict(&dict, "origin").unwrap())
1183                .unwrap(),
1184            find_index_in_dict(&dict, "cloudfunction").unwrap()
1185        );
1186        assert_eq!(
1187            *span
1188                .meta
1189                .get(&find_index_in_dict(&dict, "functionname").unwrap())
1190                .unwrap(),
1191            find_index_in_dict(&dict, "dummy_function_name").unwrap()
1192        );
1193        assert_eq!(
1194            *span
1195                .metrics
1196                .get(&find_index_in_dict(&dict, "_top_level").unwrap())
1197                .unwrap(),
1198            1.0
1199        );
1200    }
1201
1202    #[test]
1203    fn test_collect_trace_chunks_v04() {
1204        let chunk = vec![create_test_no_alloc_span(123, 456, 789, 1, true)];
1205
1206        let collection = collect_trace_chunks(vec![chunk], TraceEncoding::V04).unwrap();
1207
1208        let traces = match collection {
1209            TraceChunks::V04(traces) => traces,
1210            _ => panic!("Unexpected type"),
1211        };
1212
1213        assert_eq!(traces.len(), 1);
1214        assert_eq!(traces[0].len(), 1);
1215        let span = &traces[0][0];
1216        assert_eq!(span.trace_id, 123);
1217        assert_eq!(span.span_id, 456);
1218        assert_eq!(span.parent_id, 789);
1219        assert_eq!(span.start, 1);
1220        assert_eq!(span.error, 0);
1221    }
1222
1223    #[test]
1224    fn test_rmp_serde_deserialize_meta_with_null_values() {
1225        // Create a JSON representation with null value in meta
1226        let span_json = json!({
1227            "service": "test-service",
1228            "name": "test_name",
1229            "resource": "test-resource",
1230            "trace_id": 1_u64,
1231            "span_id": 2_u64,
1232            "parent_id": 0_u64,
1233            "start": 0_i64,
1234            "duration": 5_i64,
1235            "error": 0_i32,
1236            "meta": {
1237                "service": "test-service",
1238                "env": "test-env",
1239                "runtime-id": "test-runtime-id-value",
1240                "problematic_key": null  // Ensure this null value does not cause an error
1241            },
1242            "metrics": {},
1243            "type": "",
1244            "meta_struct": {},
1245            "span_links": [],
1246            "span_events": []
1247        });
1248
1249        let traces_json = vec![vec![span_json]];
1250        let encoded_data = rmp_serde::to_vec(&traces_json).unwrap();
1251        let traces: Vec<Vec<pb::Span>> = rmp_serde::from_read(&encoded_data[..])
1252            .expect("Failed to deserialize traces with null values in meta");
1253
1254        assert_eq!(1, traces.len());
1255        assert_eq!(1, traces[0].len());
1256        let decoded_span = &traces[0][0];
1257
1258        assert_eq!("test-service", decoded_span.service);
1259        assert_eq!("test_name", decoded_span.name);
1260        assert_eq!("test-resource", decoded_span.resource);
1261        assert_eq!("test-service", decoded_span.meta.get("service").unwrap());
1262        assert_eq!("test-env", decoded_span.meta.get("env").unwrap());
1263        assert_eq!(
1264            "test-runtime-id-value",
1265            decoded_span.meta.get("runtime-id").unwrap()
1266        );
1267        // Assert that the null value was filtered out (key not present in map)
1268        assert!(
1269            !decoded_span.meta.contains_key("problematic_key"),
1270            "Null value should be skipped, but key was present"
1271        );
1272    }
1273
1274    #[test]
1275    fn test_enrich_span_with_azure_function_metadata_adds_tags_for_non_apim() {
1276        let mut span = create_test_span(1234, 12342, 12341, 1, false);
1277        span.name = "azure.function".to_string();
1278
1279        enrich_span_with_azure_function_metadata(&mut span);
1280
1281        // If AAS_METADATA_FUNCTION is available, verify aas.* tags were added
1282        // If not available (most test environments), this is a no-op
1283        // This test primarily ensures the function doesn't skip non-apim spans
1284        if azure_app_services::AAS_METADATA_FUNCTION.is_some() {
1285            assert!(span.meta.contains_key("aas.resource.id"));
1286            assert!(span.meta.contains_key("aas.environment.instance_id"));
1287            assert!(span.meta.contains_key("aas.environment.instance_name"));
1288            assert!(span.meta.contains_key("aas.subscription.id"));
1289            assert!(span.meta.contains_key("aas.environment.os"));
1290            assert!(span.meta.contains_key("aas.environment.runtime"));
1291            assert!(span.meta.contains_key("aas.environment.runtime_version"));
1292            assert!(span.meta.contains_key("aas.environment.function_runtime"));
1293            assert!(span.meta.contains_key("aas.resource.group"));
1294            assert!(span.meta.contains_key("aas.site.name"));
1295            assert!(span.meta.contains_key("aas.site.kind"));
1296            assert!(span.meta.contains_key("aas.site.type"));
1297        }
1298    }
1299
1300    #[test]
1301    fn test_enrich_span_with_azure_function_metadata_skips_azure_apim() {
1302        let mut span = create_test_span(1234, 12342, 12341, 1, false);
1303        span.name = "azure.apim".to_string();
1304
1305        enrich_span_with_azure_function_metadata(&mut span);
1306
1307        // Verify no aas.* tags were added
1308        assert!(!span.meta.contains_key("aas.resource.id"));
1309        assert!(!span.meta.contains_key("aas.environment.instance_id"));
1310        assert!(!span.meta.contains_key("aas.environment.instance_name"));
1311        assert!(!span.meta.contains_key("aas.subscription.id"));
1312        assert!(!span.meta.contains_key("aas.environment.os"));
1313        assert!(!span.meta.contains_key("aas.environment.runtime"));
1314        assert!(!span.meta.contains_key("aas.environment.runtime_version"));
1315        assert!(!span.meta.contains_key("aas.environment.function_runtime"));
1316        assert!(!span.meta.contains_key("aas.resource.group"));
1317        assert!(!span.meta.contains_key("aas.site.name"));
1318        assert!(!span.meta.contains_key("aas.site.kind"));
1319        assert!(!span.meta.contains_key("aas.site.type"));
1320    }
1321
1322    #[test]
1323    fn test_collect_pb_trace_chunks_searches_multiple_root_spans_for_fields() {
1324        // First trace root span has no fields. Second trace root span has all fields.
1325        // The second root span should populate all fields.
1326        let mut first_root_span = create_test_span(1, 1, 0, 1, true);
1327        first_root_span.meta.remove("env");
1328        first_root_span.meta.remove("runtime-id");
1329
1330        let mut second_root_span = create_test_span(2, 3, 0, 1, true);
1331        second_root_span
1332            .meta
1333            .insert("version".to_string(), "1.2.3".to_string());
1334        second_root_span
1335            .meta
1336            .insert("env".to_string(), "prod".to_string());
1337        second_root_span
1338            .meta
1339            .insert("_dd.hostname".to_string(), "my-host".to_string());
1340        second_root_span
1341            .meta
1342            .insert("runtime-id".to_string(), "123".to_string());
1343
1344        let result = collect_pb_trace_chunks(
1345            vec![vec![first_root_span], vec![second_root_span]],
1346            &TracerHeaderTags::default(),
1347            &mut tracer_payload::DefaultTraceChunkProcessor,
1348            true,
1349        )
1350        .unwrap();
1351
1352        let TracerPayloadCollection::V07(payloads) = result else {
1353            panic!("expected TracerPayloadCollection::V07");
1354        };
1355        assert_eq!(payloads[0].app_version, "1.2.3");
1356        assert_eq!(payloads[0].env, "prod");
1357        assert_eq!(payloads[0].hostname, "my-host");
1358        assert_eq!(payloads[0].runtime_id, "123");
1359    }
1360
1361    #[test]
1362    fn test_collect_pb_trace_chunks_searches_non_root_spans_for_fields() {
1363        // Root span has no fields. Child span has all fields.
1364        // The child span should populate all fields.
1365        let mut root_span = create_test_span(1, 1, 0, 1, true);
1366        root_span.meta.remove("env");
1367        root_span.meta.remove("runtime-id");
1368        let mut child_span = create_test_span(1, 2, 1, 1, false);
1369        child_span
1370            .meta
1371            .insert("version".to_string(), "1.2.3".to_string());
1372        child_span
1373            .meta
1374            .insert("env".to_string(), "prod".to_string());
1375        child_span
1376            .meta
1377            .insert("_dd.hostname".to_string(), "my-host".to_string());
1378        child_span
1379            .meta
1380            .insert("runtime-id".to_string(), "123".to_string());
1381
1382        let result = collect_pb_trace_chunks(
1383            vec![vec![root_span, child_span]],
1384            &TracerHeaderTags::default(),
1385            &mut tracer_payload::DefaultTraceChunkProcessor,
1386            true,
1387        )
1388        .unwrap();
1389
1390        let TracerPayloadCollection::V07(payloads) = result else {
1391            panic!("expected TracerPayloadCollection::V07");
1392        };
1393        assert_eq!(payloads[0].app_version, "1.2.3");
1394        assert_eq!(payloads[0].env, "prod");
1395        assert_eq!(payloads[0].hostname, "my-host");
1396        assert_eq!(payloads[0].runtime_id, "123");
1397    }
1398
1399    #[test]
1400    fn test_collect_pb_trace_chunks_root_span_takes_priority_over_child() {
1401        // Root span has all fields. Child has different values for all fields.
1402        // The root span should populate all fields.
1403        let mut root_span = create_test_span(1, 1, 0, 1, true);
1404        root_span
1405            .meta
1406            .insert("version".to_string(), "root-version".to_string());
1407        root_span
1408            .meta
1409            .insert("env".to_string(), "root-env".to_string());
1410        root_span
1411            .meta
1412            .insert("_dd.hostname".to_string(), "root-host".to_string());
1413        root_span
1414            .meta
1415            .insert("runtime-id".to_string(), "root-runtime-id".to_string());
1416
1417        let mut child_span = create_test_span(1, 2, 1, 1, false);
1418        child_span
1419            .meta
1420            .insert("version".to_string(), "child-version".to_string());
1421        child_span
1422            .meta
1423            .insert("env".to_string(), "child-env".to_string());
1424        child_span
1425            .meta
1426            .insert("_dd.hostname".to_string(), "child-host".to_string());
1427        child_span
1428            .meta
1429            .insert("runtime-id".to_string(), "child-runtime-id".to_string());
1430
1431        let result = collect_pb_trace_chunks(
1432            vec![vec![root_span, child_span]],
1433            &TracerHeaderTags::default(),
1434            &mut tracer_payload::DefaultTraceChunkProcessor,
1435            true,
1436        )
1437        .unwrap();
1438
1439        let TracerPayloadCollection::V07(payloads) = result else {
1440            panic!("expected TracerPayloadCollection::V07");
1441        };
1442        assert_eq!(payloads[0].app_version, "root-version");
1443        assert_eq!(payloads[0].env, "root-env");
1444        assert_eq!(payloads[0].hostname, "root-host");
1445        assert_eq!(payloads[0].runtime_id, "root-runtime-id");
1446    }
1447
1448    #[test]
1449    fn test_collect_pb_trace_chunks_skips_empty_root_span_value() {
1450        // Root span has empty values for all fields. Child span has non-empty values.
1451        // The child span should populate all fields.
1452        let mut root_span = create_test_span(1, 1, 0, 1, true);
1453        root_span.meta.insert("version".to_string(), "".to_string());
1454        root_span.meta.insert("env".to_string(), "".to_string());
1455        root_span
1456            .meta
1457            .insert("_dd.hostname".to_string(), "".to_string());
1458        root_span
1459            .meta
1460            .insert("runtime-id".to_string(), "".to_string());
1461
1462        let mut child_span = create_test_span(1, 2, 1, 1, false);
1463        child_span
1464            .meta
1465            .insert("version".to_string(), "1.2.3".to_string());
1466        child_span
1467            .meta
1468            .insert("env".to_string(), "prod".to_string());
1469        child_span
1470            .meta
1471            .insert("_dd.hostname".to_string(), "my-host".to_string());
1472        child_span
1473            .meta
1474            .insert("runtime-id".to_string(), "123".to_string());
1475
1476        let result = collect_pb_trace_chunks(
1477            vec![vec![root_span, child_span]],
1478            &TracerHeaderTags::default(),
1479            &mut tracer_payload::DefaultTraceChunkProcessor,
1480            true,
1481        )
1482        .unwrap();
1483
1484        let TracerPayloadCollection::V07(payloads) = result else {
1485            panic!("expected TracerPayloadCollection::V07");
1486        };
1487        assert_eq!(payloads[0].app_version, "1.2.3");
1488        assert_eq!(payloads[0].env, "prod");
1489        assert_eq!(payloads[0].hostname, "my-host");
1490        assert_eq!(payloads[0].runtime_id, "123");
1491    }
1492
1493    #[test]
1494    fn test_collect_pb_trace_chunks_normalizes_env() {
1495        let mut root = create_test_span(1, 1, 0, 1, true);
1496        root.meta
1497            .insert("env".to_string(), "PRODUCTION".to_string());
1498
1499        let result = collect_pb_trace_chunks(
1500            vec![vec![root]],
1501            &TracerHeaderTags::default(),
1502            &mut tracer_payload::DefaultTraceChunkProcessor,
1503            true,
1504        )
1505        .unwrap();
1506
1507        let TracerPayloadCollection::V07(payloads) = result else {
1508            panic!("expected TracerPayloadCollection::V07");
1509        };
1510        assert_eq!(payloads[0].env, "production");
1511    }
1512
1513    #[test]
1514    fn test_collect_pb_trace_chunks_skips_env_empty_after_normalization() {
1515        // First root span has an env that normalizes to empty (all invalid characters).
1516        // Second root span has an env should populate env fields.
1517        let mut first_root_span = create_test_span(1, 1, 0, 1, true);
1518        first_root_span
1519            .meta
1520            .insert("env".to_string(), "!!!".to_string());
1521
1522        let mut second_root_span = create_test_span(2, 3, 0, 1, true);
1523        second_root_span
1524            .meta
1525            .insert("env".to_string(), "prod".to_string());
1526
1527        let result = collect_pb_trace_chunks(
1528            vec![vec![first_root_span], vec![second_root_span]],
1529            &TracerHeaderTags::default(),
1530            &mut tracer_payload::DefaultTraceChunkProcessor,
1531            true,
1532        )
1533        .unwrap();
1534
1535        let TracerPayloadCollection::V07(payloads) = result else {
1536            panic!("expected TracerPayloadCollection::V07");
1537        };
1538        assert_eq!(payloads[0].env, "prod");
1539    }
1540
1541    #[test]
1542    fn test_search_trace_for_field_skips_span_with_same_id_as_root() {
1543        // A span with the same span_id as root is treated as the root and skipped
1544        // in the child span search. Only the root spans own meta is checked for it.
1545        let mut root = create_test_span(1, 1, 0, 1, true);
1546        root.meta.remove("version");
1547
1548        // This span shares the same span_id as the root span, it should be skipped.
1549        let mut duplicate = create_test_span(1, 1, 0, 1, false);
1550        duplicate
1551            .meta
1552            .insert("version".to_string(), "should-not-appear".to_string());
1553
1554        let trace = vec![root.clone(), duplicate];
1555        assert_eq!(search_trace_for_field(&root, &trace, "version"), None);
1556    }
1557}