Skip to main content

libdd_trace_utils/
trace_utils.rs

1// Copyright 2023-Present Datadog, Inc. https://www.datadoghq.com/
2// SPDX-License-Identifier: Apache-2.0
3
4pub use crate::send_data::send_data_result::SendDataResult;
5pub use crate::send_data::SendData;
6use crate::span::v05::dict::SharedDict;
7use crate::span::{v05, TraceData};
8pub use crate::tracer_header_tags::TracerHeaderTags;
9use crate::tracer_payload::TracerPayloadCollection;
10use crate::tracer_payload::{self, TraceChunks};
11use anyhow::anyhow;
12use bytes::buf::Reader;
13use bytes::Buf;
14use http_body_util::BodyExt;
15use libdd_common::azure_app_services;
16use libdd_trace_normalization::normalizer;
17use libdd_trace_protobuf::pb;
18use rmp::decode::read_array_len;
19use rmpv::decode::read_value;
20use rmpv::{Integer, Value};
21use std::cmp::Ordering;
22use std::collections::{HashMap, HashSet};
23use std::env;
24use tracing::{debug, error};
25
26/// The maximum payload size for a single request that can be sent to the trace agent. Payloads
27/// larger than this size will be dropped and the agent will return a 413 error if
28/// `datadog-send-real-http-status` is set.
29pub const MAX_PAYLOAD_SIZE: usize = 25 * 1024 * 1024;
30/// Span metric the mini agent must set for the backend to recognize top level span
31const TOP_LEVEL_KEY: &str = "_top_level";
32/// Span metric the tracer sets to denote a top level span
33const TRACER_TOP_LEVEL_KEY: &str = "_dd.top_level";
34const MEASURED_KEY: &str = "_dd.measured";
35const PARTIAL_VERSION_KEY: &str = "_dd.partial_version";
36const MAX_STRING_DICT_SIZE: u32 = 25_000_000;
37const SPAN_ELEMENT_COUNT: usize = 12;
38
39/// First value of returned tuple is the payload size
40pub async fn get_traces_from_request_body<B>(body: B) -> anyhow::Result<(usize, Vec<Vec<pb::Span>>)>
41where
42    B: http_body::Body,
43    B::Error: std::error::Error + Send + Sync + 'static,
44{
45    let buffer = body.collect().await?.aggregate();
46    let size = buffer.remaining();
47
48    let traces: Vec<Vec<pb::Span>> = match rmp_serde::from_read(buffer.reader()) {
49        Ok(res) => res,
50        Err(err) => {
51            anyhow::bail!("Error deserializing trace from request body: {err}")
52        }
53    };
54
55    Ok((size, traces))
56}
57
58#[inline]
59fn get_v05_strings_dict(reader: &mut Reader<impl Buf>) -> anyhow::Result<Vec<String>> {
60    let dict_size =
61        read_array_len(reader).map_err(|err| anyhow!("Error reading dict size: {err}"))?;
62    if dict_size > MAX_STRING_DICT_SIZE {
63        anyhow::bail!(
64            "Error deserializing strings dictionary. Dict size is too large: {dict_size}"
65        );
66    }
67    let mut dict: Vec<String> = Vec::with_capacity(dict_size.try_into()?);
68    for _ in 0..dict_size {
69        match read_value(reader)? {
70            Value::String(s) => {
71                let parsed_string = s.into_str().ok_or_else(|| anyhow!("Error reading string dict"))?;
72                dict.push(parsed_string);
73            }
74            val => anyhow::bail!("Error deserializing strings dictionary. Value in string dict is not a string: {val}")
75        }
76    }
77    Ok(dict)
78}
79
80#[inline]
81fn get_v05_span(reader: &mut Reader<impl Buf>, dict: &[String]) -> anyhow::Result<pb::Span> {
82    let mut span: pb::Span = Default::default();
83    let span_size = rmp::decode::read_array_len(reader)
84        .map_err(|err| anyhow!("Error reading span size: {err}"))? as usize;
85    if span_size != SPAN_ELEMENT_COUNT {
86        anyhow::bail!("Expected an array of exactly 12 elements in a span, got {span_size}");
87    }
88    // 0 - service
89    span.service = get_v05_string(reader, dict, "service")?;
90    // 1 - name
91    span.name = get_v05_string(reader, dict, "name")?;
92    // 2 - resource
93    span.resource = get_v05_string(reader, dict, "resource")?;
94
95    // 3 - trace_id
96    match read_value(reader)? {
97        Value::Integer(i) => {
98            span.trace_id = i.as_u64().ok_or_else(|| {
99                anyhow!("Error reading span trace_id, value is not an integer: {i}")
100            })?;
101        }
102        val => anyhow::bail!("Error reading span trace_id, value is not an integer: {val}"),
103    };
104    // 4 - span_id
105    match read_value(reader)? {
106        Value::Integer(i) => {
107            span.span_id = i.as_u64().ok_or_else(|| {
108                anyhow!("Error reading span span_id, value is not an integer: {i}")
109            })?;
110        }
111        val => anyhow::bail!("Error reading span span_id, value is not an integer: {val}"),
112    };
113    // 5 - parent_id
114    match read_value(reader)? {
115        Value::Integer(i) => {
116            span.parent_id = i.as_u64().ok_or_else(|| {
117                anyhow!("Error reading span parent_id, value is not an integer: {i}")
118            })?;
119        }
120        val => anyhow::bail!("Error reading span parent_id, value is not an integer: {val}"),
121    };
122    // 6 - start
123    match read_value(reader)? {
124        Value::Integer(i) => {
125            span.start = i
126                .as_i64()
127                .ok_or_else(|| anyhow!("Error reading span start, value is not an integer: {i}"))?;
128        }
129        val => anyhow::bail!("Error reading span start, value is not an integer: {val}"),
130    };
131    // 7 - duration
132    match read_value(reader)? {
133        Value::Integer(i) => {
134            span.duration = i.as_i64().ok_or_else(|| {
135                anyhow!("Error reading span duration, value is not an integer: {i}")
136            })?;
137        }
138        val => anyhow::bail!("Error reading span duration, value is not an integer: {val}"),
139    };
140    // 8 - error
141    match read_value(reader)? {
142        Value::Integer(i) => {
143            span.error = i
144                .as_i64()
145                .ok_or_else(|| anyhow!("Error reading span error, value is not an integer: {i}"))?
146                as i32;
147        }
148        val => anyhow::bail!("Error reading span error, value is not an integer: {val}"),
149    }
150    // 9 - meta
151    match read_value(reader)? {
152        Value::Map(meta) => {
153            for (k, v) in meta.iter() {
154                match k {
155                    Value::Integer(k) => {
156                        match v {
157                            Value::Integer(v) => {
158                                let key = str_from_dict(dict, *k)?;
159                                let val = str_from_dict(dict, *v)?;
160                                span.meta.insert(key, val);
161                            }
162                            _ => anyhow::bail!("Error reading span meta, value is not an integer and can't be looked up in dict: {v}")
163                        }
164                    }
165                    _ => anyhow::bail!("Error reading span meta, key is not an integer and can't be looked up in dict: {k}")
166                }
167            }
168        }
169        val => anyhow::bail!("Error reading span meta, value is not a map: {val}"),
170    }
171    // 10 - metrics
172    match read_value(reader)? {
173        Value::Map(metrics) => {
174            for (k, v) in metrics.iter() {
175                match k {
176                    Value::Integer(k) => {
177                        match v {
178                            Value::Integer(v) => {
179                                let key = str_from_dict(dict, *k)?;
180                                span.metrics.insert(key, v.as_f64().ok_or_else(||anyhow!("Error reading span metrics, value is not an integer: {v}"))?);
181                            }
182                            Value::F64(v) => {
183                                let key = str_from_dict(dict, *k)?;
184                                span.metrics.insert(key, *v);
185                            }
186                            _ => anyhow::bail!(
187                                "Error reading span metrics, value is not a float or integer: {v}"
188                            ),
189                        }
190                    }
191                    _ => anyhow::bail!("Error reading span metrics, key is not an integer: {k}"),
192                }
193            }
194        }
195        val => anyhow::bail!("Error reading span metrics, value is not a map: {val}"),
196    }
197
198    // 11 - type
199    match read_value(reader)? {
200        Value::Integer(s) => span.r#type = str_from_dict(dict, s)?,
201        val => anyhow::bail!("Error reading span type, value is not an integer: {val}"),
202    }
203    Ok(span)
204}
205
206#[inline]
207fn str_from_dict(dict: &[String], id: Integer) -> anyhow::Result<String> {
208    let id = id
209        .as_i64()
210        .ok_or_else(|| anyhow!("Error reading string from dict, id is not an integer: {id}"))?
211        as usize;
212    if id >= dict.len() {
213        anyhow::bail!("Error reading string from dict, id out of bounds: {id}");
214    }
215    Ok(dict[id].to_string())
216}
217
218#[inline]
219fn get_v05_string(
220    reader: &mut Reader<impl Buf>,
221    dict: &[String],
222    field_name: &str,
223) -> anyhow::Result<String> {
224    match read_value(reader)? {
225        Value::Integer(s) => {
226            str_from_dict(dict, s)
227        },
228        val => anyhow::bail!("Error reading {field_name}, value is not an integer and can't be looked up in dict: {val}")
229    }
230}
231
232pub async fn get_v05_traces_from_request_body<B>(
233    body: B,
234) -> anyhow::Result<(usize, Vec<Vec<pb::Span>>)>
235where
236    B: http_body::Body,
237    B::Error: std::error::Error + Send + Sync + 'static,
238{
239    let buffer = body.collect().await?.aggregate();
240    let body_size = buffer.remaining();
241    let mut reader = buffer.reader();
242    let wrapper_size = read_array_len(&mut reader)?;
243    if wrapper_size != 2 {
244        anyhow::bail!("Expected an arrary of exactly 2 elements, got {wrapper_size}");
245    }
246
247    let dict = get_v05_strings_dict(&mut reader)?;
248
249    let traces_size = rmp::decode::read_array_len(&mut reader)?;
250    let mut traces: Vec<Vec<pb::Span>> = Default::default();
251
252    for _ in 0..traces_size {
253        let spans_size = rmp::decode::read_array_len(&mut reader)?;
254        let mut trace: Vec<pb::Span> = Default::default();
255
256        for _ in 0..spans_size {
257            let span = get_v05_span(&mut reader, &dict)?;
258            trace.push(span);
259        }
260        traces.push(trace);
261    }
262    Ok((body_size, traces))
263}
264
265/// Tags gathered from a trace's root span
266#[derive(Default)]
267pub struct RootSpanTags<'a> {
268    pub env: &'a str,
269    pub app_version: &'a str,
270    pub hostname: &'a str,
271    pub runtime_id: &'a str,
272}
273
274pub(crate) fn construct_trace_chunk(trace: Vec<pb::Span>) -> pb::TraceChunk {
275    pb::TraceChunk {
276        priority: normalizer::SamplerPriority::None as i32,
277        origin: "".to_string(),
278        spans: trace,
279        tags: HashMap::new(),
280        dropped_trace: false,
281    }
282}
283
284pub(crate) fn construct_tracer_payload(
285    chunks: Vec<pb::TraceChunk>,
286    tracer_tags: &TracerHeaderTags,
287    root_span_tags: RootSpanTags,
288) -> pb::TracerPayload {
289    pb::TracerPayload {
290        app_version: root_span_tags.app_version.to_string(),
291        language_name: tracer_tags.lang.to_string(),
292        container_id: tracer_tags.container_id.to_string(),
293        env: root_span_tags.env.to_string(),
294        runtime_id: root_span_tags.runtime_id.to_string(),
295        chunks,
296        hostname: root_span_tags.hostname.to_string(),
297        language_version: tracer_tags.lang_version.to_string(),
298        tags: HashMap::new(),
299        tracer_version: tracer_tags.tracer_version.to_string(),
300    }
301}
302
303pub(crate) fn cmp_send_data_payloads(a: &pb::TracerPayload, b: &pb::TracerPayload) -> Ordering {
304    a.tracer_version
305        .cmp(&b.tracer_version)
306        .then(a.language_version.cmp(&b.language_version))
307        .then(a.language_name.cmp(&b.language_name))
308        .then(a.hostname.cmp(&b.hostname))
309        .then(a.container_id.cmp(&b.container_id))
310        .then(a.runtime_id.cmp(&b.runtime_id))
311        .then(a.env.cmp(&b.env))
312        .then(a.app_version.cmp(&b.app_version))
313}
314
315pub fn coalesce_send_data(mut data: Vec<SendData>) -> Vec<SendData> {
316    // TODO trace payloads with identical data except for chunk could be merged?
317
318    data.sort_unstable_by(|a, b| {
319        a.get_target()
320            .url
321            .to_string()
322            .cmp(&b.get_target().url.to_string())
323            .then(a.get_target().test_token.cmp(&b.get_target().test_token))
324    });
325    data.dedup_by(|a, b| {
326        if a.get_target().url == b.get_target().url
327            && a.get_target().test_token == b.get_target().test_token
328        {
329            // Size is only an approximation. In practice it won't vary much, but be safe here.
330            // We also don't care about the exact maximum size, like two 25 MB or one 50 MB request
331            // has similar results. The primary goal here is avoiding many small requests.
332            // TODO: maybe make the MAX_PAYLOAD_SIZE configurable?
333            if a.size + b.size < MAX_PAYLOAD_SIZE / 2 {
334                // Note: dedup_by drops a, and retains b.
335                b.tracer_payloads.append(&mut a.tracer_payloads);
336                b.size += a.size;
337                return true;
338            }
339        }
340        false
341    });
342    // Merge chunks with common properties. Reduces requests for agentful mode.
343    // And reduces a little bit of data for agentless.
344    for send_data in data.iter_mut() {
345        send_data.tracer_payloads.merge();
346    }
347    data
348}
349
350pub fn get_root_span_index(trace: &[pb::Span]) -> anyhow::Result<usize> {
351    if trace.is_empty() {
352        anyhow::bail!("Cannot find root span index in an empty trace.");
353    }
354
355    // Do a first pass to find if we have an obvious root span (starting from the end) since some
356    // clients put the root span last.
357    for (i, span) in trace.iter().enumerate().rev() {
358        if span.parent_id == 0 {
359            return Ok(i);
360        }
361    }
362
363    let mut span_ids: HashSet<u64> = HashSet::with_capacity(trace.len());
364    for span in trace.iter() {
365        span_ids.insert(span.span_id);
366    }
367
368    let mut root_span_id = None;
369    for (i, span) in trace.iter().enumerate() {
370        // If a span's parent is not in the trace, it is a root
371        if !span_ids.contains(&span.parent_id) {
372            if root_span_id.is_some() {
373                debug!(
374                    trace_id = &trace[0].trace_id,
375                    "trace has multiple root spans"
376                );
377            }
378            root_span_id = Some(i);
379        }
380    }
381    Ok(match root_span_id {
382        Some(i) => i,
383        None => {
384            debug!(
385                trace_id = &trace[0].trace_id,
386                "Could not find the root span for trace"
387            );
388            trace.len() - 1
389        }
390    })
391}
392
393/// Updates all the spans top-level attribute.
394/// A span is considered top-level if:
395///   - it's a root span
396///   - OR its parent is unknown (other part of the code, distributed trace)
397///   - OR its parent belongs to another service (in that case it's a "local root" being the highest
398///     ancestor of other spans belonging to this service and attached to it).
399pub fn compute_top_level_span(trace: &mut [pb::Span]) {
400    let mut span_id_to_service: HashMap<u64, String> = HashMap::new();
401    for span in trace.iter() {
402        span_id_to_service.insert(span.span_id, span.service.clone());
403    }
404    for span in trace.iter_mut() {
405        if span.parent_id == 0 {
406            set_top_level_span(span, true);
407            continue;
408        }
409        match span_id_to_service.get(&span.parent_id) {
410            Some(parent_span_service) => {
411                if !parent_span_service.eq(&span.service) {
412                    // parent is not in the same service
413                    set_top_level_span(span, true)
414                }
415            }
416            None => {
417                // span has no parent in chunk
418                set_top_level_span(span, true)
419            }
420        }
421    }
422}
423
424/// Return true if the span has a top level key set
425pub fn has_top_level(span: &pb::Span) -> bool {
426    span.metrics
427        .get(TRACER_TOP_LEVEL_KEY)
428        .is_some_and(|v| *v == 1.0)
429        || span.metrics.get(TOP_LEVEL_KEY).is_some_and(|v| *v == 1.0)
430}
431
432fn set_top_level_span(span: &mut pb::Span, is_top_level: bool) {
433    if is_top_level {
434        span.metrics.insert(TOP_LEVEL_KEY.to_string(), 1.0);
435    } else {
436        span.metrics.remove(TOP_LEVEL_KEY);
437    }
438}
439
440pub fn set_serverless_root_span_tags(
441    span: &mut pb::Span,
442    app_name: Option<String>,
443    env_type: &EnvironmentType,
444) {
445    let origin_tag = match env_type {
446        EnvironmentType::CloudFunction => "cloudfunction",
447        EnvironmentType::AzureFunction => "azurefunction",
448        EnvironmentType::AzureSpringApp => "azurespringapp",
449        EnvironmentType::LambdaFunction => "lambda", // historical reasons
450    };
451    span.meta
452        .insert("_dd.origin".to_string(), origin_tag.to_string());
453    span.meta
454        .insert("origin".to_string(), origin_tag.to_string());
455
456    if let Some(function_name) = app_name {
457        match env_type {
458            EnvironmentType::CloudFunction
459            | EnvironmentType::AzureFunction
460            | EnvironmentType::LambdaFunction => {
461                span.meta.insert("functionname".to_string(), function_name);
462            }
463            _ => {}
464        }
465    }
466}
467
468fn update_tracer_top_level(span: &mut pb::Span) {
469    if span.metrics.contains_key(TRACER_TOP_LEVEL_KEY) {
470        span.metrics.insert(TOP_LEVEL_KEY.to_string(), 1.0);
471    }
472}
473
474#[derive(Clone, Debug, Eq, PartialEq)]
475pub enum EnvironmentType {
476    CloudFunction,
477    AzureFunction,
478    AzureSpringApp,
479    LambdaFunction,
480}
481
482#[derive(Clone, Debug, Eq, PartialEq)]
483pub struct MiniAgentMetadata {
484    pub azure_spring_app_hostname: Option<String>,
485    pub azure_spring_app_name: Option<String>,
486    pub gcp_project_id: Option<String>,
487    pub gcp_region: Option<String>,
488    pub version: Option<String>,
489}
490
491impl Default for MiniAgentMetadata {
492    fn default() -> Self {
493        MiniAgentMetadata {
494            azure_spring_app_hostname: Default::default(),
495            azure_spring_app_name: Default::default(),
496            gcp_project_id: Default::default(),
497            gcp_region: Default::default(),
498            version: env::var("DD_SERVERLESS_COMPAT_VERSION").ok(),
499        }
500    }
501}
502
503pub fn enrich_span_with_mini_agent_metadata(
504    span: &mut pb::Span,
505    mini_agent_metadata: &MiniAgentMetadata,
506) {
507    if let Some(azure_spring_app_hostname) = &mini_agent_metadata.azure_spring_app_hostname {
508        span.meta.insert(
509            "asa.hostname".to_string(),
510            azure_spring_app_hostname.to_string(),
511        );
512    }
513    if let Some(azure_spring_app_name) = &mini_agent_metadata.azure_spring_app_name {
514        span.meta
515            .insert("asa.name".to_string(), azure_spring_app_name.to_string());
516    }
517    if let Some(serverless_compat_version) = &mini_agent_metadata.version {
518        span.meta.insert(
519            "_dd.serverless_compat_version".to_string(),
520            serverless_compat_version.to_string(),
521        );
522    }
523}
524
525pub fn enrich_span_with_google_cloud_function_metadata(
526    span: &mut pb::Span,
527    mini_agent_metadata: &MiniAgentMetadata,
528    function: Option<String>,
529) {
530    #[allow(clippy::todo)]
531    let Some(region) = &mini_agent_metadata.gcp_region
532    else {
533        todo!()
534    };
535    #[allow(clippy::todo)]
536    let Some(project) = &mini_agent_metadata.gcp_project_id
537    else {
538        todo!()
539    };
540
541    if let Some(function) = function {
542        if !region.is_empty() && !project.is_empty() {
543            let resource_name = format!(
544                "projects/{}/locations/{}/functions/{}",
545                project, region, function
546            );
547
548            span.meta
549                .insert("gcrfx.location".to_string(), region.to_string());
550            span.meta
551                .insert("gcrfx.project_id".to_string(), project.to_string());
552            span.meta
553                .insert("gcrfx.resource_name".to_string(), resource_name.to_string());
554        }
555    }
556}
557
558pub fn enrich_span_with_azure_function_metadata(span: &mut pb::Span) {
559    if span.name == "azure.apim" {
560        return;
561    }
562
563    if let Some(aas_metadata) = &*azure_app_services::AAS_METADATA_FUNCTION {
564        span.meta.extend(
565            aas_metadata
566                .get_function_tags()
567                .map(|(name, value)| (name.to_string(), value.to_string())),
568        );
569    }
570}
571
572/// Used to populate root_span_tags fields if they exist in the root span's meta tags
573macro_rules! parse_root_span_tags {
574    (
575        $root_span_meta_map:ident,
576        { $($tag:literal => $($root_span_tags_struct_field:ident).+ ,)+ }
577    ) => {
578        $(
579            if let Some(root_span_tag_value) = $root_span_meta_map.get($tag) {
580                $($root_span_tags_struct_field).+ = root_span_tag_value;
581            }
582        )+
583    }
584}
585
586pub fn collect_trace_chunks<T: TraceData>(
587    traces: Vec<Vec<crate::span::v04::Span<T>>>,
588    use_v05_format: bool,
589) -> anyhow::Result<TraceChunks<T>> {
590    if use_v05_format {
591        let mut shared_dict = SharedDict::default();
592        let mut v05_traces: Vec<Vec<v05::Span>> = Vec::with_capacity(traces.len());
593        for trace in traces {
594            let trace_len = trace.len();
595            let v05_trace = trace.into_iter().try_fold(
596                Vec::with_capacity(trace_len),
597                |mut acc, span| -> anyhow::Result<Vec<v05::Span>> {
598                    acc.push(v05::from_v04_span(span, &mut shared_dict)?);
599                    Ok(acc)
600                },
601            )?;
602
603            v05_traces.push(v05_trace);
604        }
605        Ok(TraceChunks::V05((shared_dict, v05_traces)))
606    } else {
607        Ok(TraceChunks::V04(traces))
608    }
609}
610
611pub fn collect_pb_trace_chunks<T: tracer_payload::TraceChunkProcessor>(
612    mut traces: Vec<Vec<pb::Span>>,
613    tracer_header_tags: &TracerHeaderTags,
614    process_chunk: &mut T,
615    is_agentless: bool,
616) -> anyhow::Result<TracerPayloadCollection> {
617    let mut trace_chunks: Vec<pb::TraceChunk> = Vec::new();
618
619    // We'll skip setting the global metadata and rely on the agent to unpack these
620    let mut gathered_root_span_tags = !is_agentless;
621    let mut root_span_tags = RootSpanTags::default();
622
623    for trace in traces.iter_mut() {
624        if is_agentless {
625            if let Err(e) = normalizer::normalize_trace(trace) {
626                error!("Error normalizing trace: {e}");
627            }
628        }
629
630        let mut chunk = construct_trace_chunk(trace.to_vec());
631
632        let root_span_index = match get_root_span_index(trace) {
633            Ok(res) => res,
634            Err(e) => {
635                error!("Error getting the root span index of a trace, skipping. {e}");
636                continue;
637            }
638        };
639
640        if let Err(e) = normalizer::normalize_chunk(&mut chunk, root_span_index) {
641            error!("Error normalizing trace chunk: {e}");
642        }
643
644        for span in chunk.spans.iter_mut() {
645            // TODO: obfuscate & truncate spans
646            if tracer_header_tags.client_computed_top_level {
647                update_tracer_top_level(span);
648            }
649        }
650
651        if !tracer_header_tags.client_computed_top_level {
652            compute_top_level_span(&mut chunk.spans);
653        }
654
655        process_chunk.process(&mut chunk, root_span_index);
656
657        trace_chunks.push(chunk);
658
659        if !gathered_root_span_tags {
660            gathered_root_span_tags = true;
661            let meta_map = &trace[root_span_index].meta;
662            parse_root_span_tags!(
663                meta_map,
664                {
665                    "env" => root_span_tags.env,
666                    "version" => root_span_tags.app_version,
667                    "_dd.hostname" => root_span_tags.hostname,
668                    "runtime-id" => root_span_tags.runtime_id,
669                }
670            );
671        }
672    }
673
674    Ok(TracerPayloadCollection::V07(vec![
675        construct_tracer_payload(trace_chunks, tracer_header_tags, root_span_tags),
676    ]))
677}
678
679/// Returns true if a span should be measured (i.e., it should get trace metrics calculated).
680pub fn is_measured(span: &pb::Span) -> bool {
681    span.metrics.get(MEASURED_KEY).is_some_and(|v| *v == 1.0)
682}
683
684/// Returns true if the span is a partial snapshot.
685/// This kind of spans are partial images of long-running spans.
686/// When incomplete, a partial snapshot has a metric _dd.partial_version which is a positive
687/// integer. The metric usually increases each time a new version of the same span is sent by the
688/// tracer
689pub fn is_partial_snapshot(span: &pb::Span) -> bool {
690    span.metrics
691        .get(PARTIAL_VERSION_KEY)
692        .is_some_and(|v| *v >= 0.0)
693}
694
695#[cfg(test)]
696mod tests {
697    use super::*;
698    use crate::{
699        span::SharedDictBytes,
700        test_utils::{create_test_no_alloc_span, create_test_span},
701    };
702    use hyper::Request;
703    use libdd_common::http_common;
704    use libdd_common::Endpoint;
705    use serde_json::json;
706
707    fn find_index_in_dict(dict: &SharedDictBytes, value: &str) -> Option<u32> {
708        let idx = dict.iter().position(|e| e.as_str() == value);
709        idx.map(|idx| idx.try_into().unwrap())
710    }
711
712    #[test]
713    fn test_coalescing_does_not_exceed_max_size() {
714        fn dummy() -> SendData {
715            SendData::new(
716                MAX_PAYLOAD_SIZE / 5 + 1,
717                TracerPayloadCollection::V07(vec![pb::TracerPayload {
718                    container_id: "".to_string(),
719                    language_name: "".to_string(),
720                    language_version: "".to_string(),
721                    tracer_version: "".to_string(),
722                    runtime_id: "".to_string(),
723                    chunks: vec![pb::TraceChunk {
724                        priority: 0,
725                        origin: "".to_string(),
726                        spans: vec![],
727                        tags: Default::default(),
728                        dropped_trace: false,
729                    }],
730                    tags: Default::default(),
731                    env: "".to_string(),
732                    hostname: "".to_string(),
733                    app_version: "".to_string(),
734                }]),
735                TracerHeaderTags::default(),
736                &Endpoint::default(),
737            )
738        }
739        let coalesced = coalesce_send_data(vec![dummy(), dummy(), dummy(), dummy(), dummy()]);
740        assert_eq!(
741            5,
742            coalesced
743                .iter()
744                .map(|s| s.tracer_payloads.size())
745                .sum::<usize>()
746        );
747        // assert some chunks are actually coalesced
748        assert!(
749            coalesced
750                .iter()
751                .map(|s| {
752                    if let TracerPayloadCollection::V07(collection) = &s.tracer_payloads {
753                        collection.iter().map(|s| s.chunks.len()).max().unwrap()
754                    } else {
755                        0
756                    }
757                })
758                .max()
759                .unwrap()
760                > 1
761        );
762        assert!(coalesced.len() > 1 && coalesced.len() < 5);
763    }
764
765    #[tokio::test]
766    #[allow(clippy::type_complexity)]
767    #[cfg_attr(all(miri, target_os = "macos"), ignore)]
768    async fn test_get_v05_traces_from_request_body() {
769        let data: (
770            Vec<String>,
771            Vec<
772                Vec<(
773                    u8,
774                    u8,
775                    u8,
776                    u64,
777                    u64,
778                    u64,
779                    i64,
780                    i64,
781                    i32,
782                    HashMap<u8, u8>,
783                    HashMap<u8, f64>,
784                    u8,
785                )>,
786            >,
787        ) = (
788            vec![
789                "baggage".to_string(),
790                "item".to_string(),
791                "elasticsearch.version".to_string(),
792                "7.0".to_string(),
793                "my-name".to_string(),
794                "X".to_string(),
795                "my-service".to_string(),
796                "my-resource".to_string(),
797                "_dd.sampling_rate_whatever".to_string(),
798                "value whatever".to_string(),
799                "sql".to_string(),
800            ],
801            vec![vec![(
802                6,
803                4,
804                7,
805                1,
806                2,
807                3,
808                123,
809                456,
810                1,
811                HashMap::from([(8, 9), (0, 1), (2, 3)]),
812                HashMap::from([(5, 1.2)]),
813                10,
814            )]],
815        );
816        let bytes = rmp_serde::to_vec(&data).unwrap();
817        let res = get_v05_traces_from_request_body(http_common::Body::from(bytes)).await;
818        assert!(res.is_ok());
819        let (_, traces) = res.unwrap();
820        let span = traces[0][0].clone();
821        let test_span = pb::Span {
822            service: "my-service".to_string(),
823            name: "my-name".to_string(),
824            resource: "my-resource".to_string(),
825            trace_id: 1,
826            span_id: 2,
827            parent_id: 3,
828            start: 123,
829            duration: 456,
830            error: 1,
831            meta: HashMap::from([
832                ("baggage".to_string(), "item".to_string()),
833                ("elasticsearch.version".to_string(), "7.0".to_string()),
834                (
835                    "_dd.sampling_rate_whatever".to_string(),
836                    "value whatever".to_string(),
837                ),
838            ]),
839            metrics: HashMap::from([("X".to_string(), 1.2)]),
840            meta_struct: HashMap::default(),
841            r#type: "sql".to_string(),
842            span_links: vec![],
843            span_events: vec![],
844        };
845        assert_eq!(span, test_span);
846    }
847
848    #[tokio::test]
849    #[cfg_attr(miri, ignore)]
850    async fn test_get_traces_from_request_body() {
851        let pairs = vec![
852            (
853                json!([{
854                    "service": "test-service",
855                    "name": "test-service-name",
856                    "resource": "test-service-resource",
857                    "trace_id": 111,
858                    "span_id": 222,
859                    "parent_id": 333,
860                    "start": 1,
861                    "duration": 5,
862                    "error": 0,
863                    "meta": {},
864                    "metrics": {},
865                }]),
866                vec![vec![pb::Span {
867                    service: "test-service".to_string(),
868                    name: "test-service-name".to_string(),
869                    resource: "test-service-resource".to_string(),
870                    trace_id: 111,
871                    span_id: 222,
872                    parent_id: 333,
873                    start: 1,
874                    duration: 5,
875                    error: 0,
876                    meta: HashMap::new(),
877                    metrics: HashMap::new(),
878                    meta_struct: HashMap::new(),
879                    r#type: "".to_string(),
880                    span_links: vec![],
881                    span_events: vec![],
882                }]],
883            ),
884            (
885                json!([{
886                    "name": "test-service-name",
887                    "resource": "test-service-resource",
888                    "trace_id": 111,
889                    "span_id": 222,
890                    "start": 1,
891                    "duration": 5,
892                    "meta": {},
893                }]),
894                vec![vec![pb::Span {
895                    service: "".to_string(),
896                    name: "test-service-name".to_string(),
897                    resource: "test-service-resource".to_string(),
898                    trace_id: 111,
899                    span_id: 222,
900                    parent_id: 0,
901                    start: 1,
902                    duration: 5,
903                    error: 0,
904                    meta: HashMap::new(),
905                    metrics: HashMap::new(),
906                    meta_struct: HashMap::new(),
907                    r#type: "".to_string(),
908                    span_links: vec![],
909                    span_events: vec![],
910                }]],
911            ),
912        ];
913
914        for (trace_input, output) in pairs {
915            let bytes = rmp_serde::to_vec(&vec![&trace_input]).unwrap();
916            let request = Request::builder()
917                .body(http_common::Body::from(bytes))
918                .unwrap();
919            let res = get_traces_from_request_body(request.into_body()).await;
920            assert!(res.is_ok());
921            assert_eq!(res.unwrap().1, output);
922        }
923    }
924
925    #[tokio::test]
926    #[cfg_attr(miri, ignore)]
927    async fn test_get_traces_from_request_body_with_span_links() {
928        let trace_input = json!([[{
929            "service": "test-service",
930            "name": "test-name",
931            "resource": "test-resource",
932            "trace_id": 111,
933            "span_id": 222,
934            "parent_id": 333,
935            "start": 1,
936            "duration": 5,
937            "error": 0,
938            "meta": {},
939            "metrics": {},
940            "span_links": [{
941                "trace_id": 999,
942                "span_id": 888,
943                "trace_id_high": 777,
944                "attributes": {"key": "value"},
945                "tracestate": "vendor=value"
946                // flags field intentionally omitted
947            }]
948        }]]);
949
950        let expected_output = vec![vec![pb::Span {
951            service: "test-service".to_string(),
952            name: "test-name".to_string(),
953            resource: "test-resource".to_string(),
954            trace_id: 111,
955            span_id: 222,
956            parent_id: 333,
957            start: 1,
958            duration: 5,
959            error: 0,
960            meta: HashMap::new(),
961            metrics: HashMap::new(),
962            meta_struct: HashMap::new(),
963            r#type: String::new(),
964            span_links: vec![pb::SpanLink {
965                trace_id: 999,
966                span_id: 888,
967                trace_id_high: 777,
968                attributes: HashMap::from([("key".to_string(), "value".to_string())]),
969                tracestate: "vendor=value".to_string(),
970                flags: 0, // Should default to 0 when omitted
971            }],
972            span_events: vec![],
973        }]];
974
975        let bytes = rmp_serde::to_vec(&trace_input).unwrap();
976        let request = Request::builder()
977            .body(http_common::Body::from(bytes))
978            .unwrap();
979
980        let res = get_traces_from_request_body(request.into_body()).await;
981        assert!(res.is_ok(), "Failed to deserialize: {res:?}");
982        assert_eq!(res.unwrap().1, expected_output);
983    }
984
985    #[test]
986    fn test_get_root_span_index_from_complete_trace() {
987        let trace = vec![
988            create_test_span(1234, 12341, 0, 1, false),
989            create_test_span(1234, 12342, 12341, 1, false),
990            create_test_span(1234, 12343, 12342, 1, false),
991        ];
992
993        let root_span_index = get_root_span_index(&trace);
994        assert!(root_span_index.is_ok());
995        assert_eq!(root_span_index.unwrap(), 0);
996    }
997
998    #[test]
999    fn test_get_root_span_index_from_partial_trace() {
1000        let trace = vec![
1001            create_test_span(1234, 12342, 12341, 1, false),
1002            create_test_span(1234, 12341, 12340, 1, false), /* this is the root span, it's
1003                                                             * parent is not in the trace */
1004            create_test_span(1234, 12343, 12342, 1, false),
1005        ];
1006
1007        let root_span_index = get_root_span_index(&trace);
1008        assert!(root_span_index.is_ok());
1009        assert_eq!(root_span_index.unwrap(), 1);
1010    }
1011
1012    #[test]
1013    fn test_set_serverless_root_span_tags_azure_function() {
1014        let mut span = create_test_span(1234, 12342, 12341, 1, false);
1015        set_serverless_root_span_tags(
1016            &mut span,
1017            Some("test_function".to_string()),
1018            &EnvironmentType::AzureFunction,
1019        );
1020        assert_eq!(
1021            span.meta,
1022            HashMap::from([
1023                (
1024                    "runtime-id".to_string(),
1025                    "test-runtime-id-value".to_string()
1026                ),
1027                ("_dd.origin".to_string(), "azurefunction".to_string()),
1028                ("origin".to_string(), "azurefunction".to_string()),
1029                ("functionname".to_string(), "test_function".to_string()),
1030                ("env".to_string(), "test-env".to_string()),
1031                ("service".to_string(), "test-service".to_string())
1032            ]),
1033        );
1034    }
1035
1036    #[test]
1037    fn test_set_serverless_root_span_tags_cloud_function() {
1038        let mut span = create_test_span(1234, 12342, 12341, 1, false);
1039        set_serverless_root_span_tags(
1040            &mut span,
1041            Some("test_function".to_string()),
1042            &EnvironmentType::CloudFunction,
1043        );
1044        assert_eq!(
1045            span.meta,
1046            HashMap::from([
1047                (
1048                    "runtime-id".to_string(),
1049                    "test-runtime-id-value".to_string()
1050                ),
1051                ("_dd.origin".to_string(), "cloudfunction".to_string()),
1052                ("origin".to_string(), "cloudfunction".to_string()),
1053                ("functionname".to_string(), "test_function".to_string()),
1054                ("env".to_string(), "test-env".to_string()),
1055                ("service".to_string(), "test-service".to_string())
1056            ]),
1057        );
1058    }
1059
1060    #[test]
1061    fn test_has_top_level() {
1062        let top_level_span = create_test_span(123, 1234, 12, 1, true);
1063        let not_top_level_span = create_test_span(123, 1234, 12, 1, false);
1064        assert!(has_top_level(&top_level_span));
1065        assert!(!has_top_level(&not_top_level_span));
1066    }
1067
1068    #[test]
1069    fn test_is_measured() {
1070        let mut measured_span = create_test_span(123, 1234, 12, 1, true);
1071        measured_span.metrics.insert(MEASURED_KEY.into(), 1.0);
1072        let not_measured_span = create_test_span(123, 1234, 12, 1, true);
1073        assert!(is_measured(&measured_span));
1074        assert!(!is_measured(&not_measured_span));
1075    }
1076
1077    #[test]
1078    fn test_compute_top_level() {
1079        let mut span_with_different_service = create_test_span(123, 5, 2, 1, false);
1080        span_with_different_service.service = "another_service".into();
1081        let mut trace = vec![
1082            // Root span, should be marked as top-level
1083            create_test_span(123, 1, 0, 1, false),
1084            // Should not be marked as top-level
1085            create_test_span(123, 2, 1, 1, false),
1086            // No parent in local trace, should be marked as
1087            // top-level
1088            create_test_span(123, 4, 3, 1, false),
1089            // Parent belongs to another service, should be marked
1090            // as top-level
1091            span_with_different_service,
1092        ];
1093
1094        compute_top_level_span(trace.as_mut_slice());
1095
1096        let spans_marked_as_top_level: Vec<u64> = trace
1097            .iter()
1098            .filter_map(|span| {
1099                if has_top_level(span) {
1100                    Some(span.span_id)
1101                } else {
1102                    None
1103                }
1104            })
1105            .collect();
1106        assert_eq!(spans_marked_as_top_level, [1, 4, 5])
1107    }
1108
1109    #[test]
1110    fn test_collect_trace_chunks_v05() {
1111        let chunk = vec![create_test_no_alloc_span(123, 456, 789, 1, true)];
1112
1113        let collection = collect_trace_chunks(vec![chunk], true).unwrap();
1114
1115        let (dict, traces) = match collection {
1116            TraceChunks::V05(payload) => payload,
1117            _ => panic!("Unexpected type"),
1118        };
1119
1120        assert_eq!(dict.len(), 16);
1121
1122        let span = &traces[0][0];
1123        assert_eq!(span.service, 1);
1124        assert_eq!(span.name, 2);
1125        assert_eq!(span.resource, 3);
1126        assert_eq!(span.trace_id, 123);
1127        assert_eq!(span.span_id, 456);
1128        assert_eq!(span.parent_id, 789);
1129        assert_eq!(span.start, 1);
1130        assert_eq!(span.error, 0);
1131        assert_eq!(span.error, 0);
1132        assert_eq!(span.r#type, 15);
1133        assert_eq!(
1134            *span
1135                .meta
1136                .get(&find_index_in_dict(&dict, "service").unwrap())
1137                .unwrap(),
1138            find_index_in_dict(&dict, "test-service").unwrap()
1139        );
1140        assert_eq!(
1141            *span
1142                .meta
1143                .get(&find_index_in_dict(&dict, "env").unwrap())
1144                .unwrap(),
1145            find_index_in_dict(&dict, "test-env").unwrap()
1146        );
1147        assert_eq!(
1148            *span
1149                .meta
1150                .get(&find_index_in_dict(&dict, "runtime-id").unwrap())
1151                .unwrap(),
1152            find_index_in_dict(&dict, "test-runtime-id-value").unwrap()
1153        );
1154        assert_eq!(
1155            *span
1156                .meta
1157                .get(&find_index_in_dict(&dict, "_dd.origin").unwrap())
1158                .unwrap(),
1159            find_index_in_dict(&dict, "cloudfunction").unwrap()
1160        );
1161        assert_eq!(
1162            *span
1163                .meta
1164                .get(&find_index_in_dict(&dict, "origin").unwrap())
1165                .unwrap(),
1166            find_index_in_dict(&dict, "cloudfunction").unwrap()
1167        );
1168        assert_eq!(
1169            *span
1170                .meta
1171                .get(&find_index_in_dict(&dict, "functionname").unwrap())
1172                .unwrap(),
1173            find_index_in_dict(&dict, "dummy_function_name").unwrap()
1174        );
1175        assert_eq!(
1176            *span
1177                .metrics
1178                .get(&find_index_in_dict(&dict, "_top_level").unwrap())
1179                .unwrap(),
1180            1.0
1181        );
1182    }
1183
1184    #[test]
1185    fn test_rmp_serde_deserialize_meta_with_null_values() {
1186        // Create a JSON representation with null value in meta
1187        let span_json = json!({
1188            "service": "test-service",
1189            "name": "test_name",
1190            "resource": "test-resource",
1191            "trace_id": 1_u64,
1192            "span_id": 2_u64,
1193            "parent_id": 0_u64,
1194            "start": 0_i64,
1195            "duration": 5_i64,
1196            "error": 0_i32,
1197            "meta": {
1198                "service": "test-service",
1199                "env": "test-env",
1200                "runtime-id": "test-runtime-id-value",
1201                "problematic_key": null  // Ensure this null value does not cause an error
1202            },
1203            "metrics": {},
1204            "type": "",
1205            "meta_struct": {},
1206            "span_links": [],
1207            "span_events": []
1208        });
1209
1210        let traces_json = vec![vec![span_json]];
1211        let encoded_data = rmp_serde::to_vec(&traces_json).unwrap();
1212        let traces: Vec<Vec<pb::Span>> = rmp_serde::from_read(&encoded_data[..])
1213            .expect("Failed to deserialize traces with null values in meta");
1214
1215        assert_eq!(1, traces.len());
1216        assert_eq!(1, traces[0].len());
1217        let decoded_span = &traces[0][0];
1218
1219        assert_eq!("test-service", decoded_span.service);
1220        assert_eq!("test_name", decoded_span.name);
1221        assert_eq!("test-resource", decoded_span.resource);
1222        assert_eq!("test-service", decoded_span.meta.get("service").unwrap());
1223        assert_eq!("test-env", decoded_span.meta.get("env").unwrap());
1224        assert_eq!(
1225            "test-runtime-id-value",
1226            decoded_span.meta.get("runtime-id").unwrap()
1227        );
1228        // Assert that the null value was filtered out (key not present in map)
1229        assert!(
1230            !decoded_span.meta.contains_key("problematic_key"),
1231            "Null value should be skipped, but key was present"
1232        );
1233    }
1234
1235    #[test]
1236    fn test_enrich_span_with_azure_function_metadata_adds_tags_for_non_apim() {
1237        let mut span = create_test_span(1234, 12342, 12341, 1, false);
1238        span.name = "azure.function".to_string();
1239
1240        enrich_span_with_azure_function_metadata(&mut span);
1241
1242        // If AAS_METADATA_FUNCTION is available, verify aas.* tags were added
1243        // If not available (most test environments), this is a no-op
1244        // This test primarily ensures the function doesn't skip non-apim spans
1245        if azure_app_services::AAS_METADATA_FUNCTION.is_some() {
1246            assert!(span.meta.contains_key("aas.resource.id"));
1247            assert!(span.meta.contains_key("aas.environment.instance_id"));
1248            assert!(span.meta.contains_key("aas.environment.instance_name"));
1249            assert!(span.meta.contains_key("aas.subscription.id"));
1250            assert!(span.meta.contains_key("aas.environment.os"));
1251            assert!(span.meta.contains_key("aas.environment.runtime"));
1252            assert!(span.meta.contains_key("aas.environment.runtime_version"));
1253            assert!(span.meta.contains_key("aas.environment.function_runtime"));
1254            assert!(span.meta.contains_key("aas.resource.group"));
1255            assert!(span.meta.contains_key("aas.site.name"));
1256            assert!(span.meta.contains_key("aas.site.kind"));
1257            assert!(span.meta.contains_key("aas.site.type"));
1258        }
1259    }
1260
1261    #[test]
1262    fn test_enrich_span_with_azure_function_metadata_skips_azure_apim() {
1263        let mut span = create_test_span(1234, 12342, 12341, 1, false);
1264        span.name = "azure.apim".to_string();
1265
1266        enrich_span_with_azure_function_metadata(&mut span);
1267
1268        // Verify no aas.* tags were added
1269        assert!(!span.meta.contains_key("aas.resource.id"));
1270        assert!(!span.meta.contains_key("aas.environment.instance_id"));
1271        assert!(!span.meta.contains_key("aas.environment.instance_name"));
1272        assert!(!span.meta.contains_key("aas.subscription.id"));
1273        assert!(!span.meta.contains_key("aas.environment.os"));
1274        assert!(!span.meta.contains_key("aas.environment.runtime"));
1275        assert!(!span.meta.contains_key("aas.environment.runtime_version"));
1276        assert!(!span.meta.contains_key("aas.environment.function_runtime"));
1277        assert!(!span.meta.contains_key("aas.resource.group"));
1278        assert!(!span.meta.contains_key("aas.site.name"));
1279        assert!(!span.meta.contains_key("aas.site.kind"));
1280        assert!(!span.meta.contains_key("aas.site.type"));
1281    }
1282}