1pub use crate::send_data::send_data_result::SendDataResult;
5pub use crate::send_data::SendData;
6use crate::span::v05::dict::SharedDict;
7use crate::span::{v05, TraceData};
8pub use crate::tracer_header_tags::TracerHeaderTags;
9use crate::tracer_payload::TracerPayloadCollection;
10use crate::tracer_payload::{self, TraceChunks};
11use anyhow::anyhow;
12use bytes::buf::Reader;
13use bytes::Buf;
14use http_body_util::BodyExt;
15use libdd_common::azure_app_services;
16use libdd_trace_normalization::normalizer;
17use libdd_trace_protobuf::pb;
18use rmp::decode::read_array_len;
19use rmpv::decode::read_value;
20use rmpv::{Integer, Value};
21use std::cmp::Ordering;
22use std::collections::{HashMap, HashSet};
23use std::env;
24use tracing::{debug, error};
25
26pub const MAX_PAYLOAD_SIZE: usize = 25 * 1024 * 1024;
30const TOP_LEVEL_KEY: &str = "_top_level";
32const TRACER_TOP_LEVEL_KEY: &str = "_dd.top_level";
34const MEASURED_KEY: &str = "_dd.measured";
35const PARTIAL_VERSION_KEY: &str = "_dd.partial_version";
36const MAX_STRING_DICT_SIZE: u32 = 25_000_000;
37const SPAN_ELEMENT_COUNT: usize = 12;
38
39pub async fn get_traces_from_request_body<B>(body: B) -> anyhow::Result<(usize, Vec<Vec<pb::Span>>)>
41where
42 B: http_body::Body,
43 B::Error: std::error::Error + Send + Sync + 'static,
44{
45 let buffer = body.collect().await?.aggregate();
46 let size = buffer.remaining();
47
48 let traces: Vec<Vec<pb::Span>> = match rmp_serde::from_read(buffer.reader()) {
49 Ok(res) => res,
50 Err(err) => {
51 anyhow::bail!("Error deserializing trace from request body: {err}")
52 }
53 };
54
55 Ok((size, traces))
56}
57
58#[inline]
59fn get_v05_strings_dict(reader: &mut Reader<impl Buf>) -> anyhow::Result<Vec<String>> {
60 let dict_size =
61 read_array_len(reader).map_err(|err| anyhow!("Error reading dict size: {err}"))?;
62 if dict_size > MAX_STRING_DICT_SIZE {
63 anyhow::bail!(
64 "Error deserializing strings dictionary. Dict size is too large: {dict_size}"
65 );
66 }
67 let mut dict: Vec<String> = Vec::with_capacity(dict_size.try_into()?);
68 for _ in 0..dict_size {
69 match read_value(reader)? {
70 Value::String(s) => {
71 let parsed_string = s.into_str().ok_or_else(|| anyhow!("Error reading string dict"))?;
72 dict.push(parsed_string);
73 }
74 val => anyhow::bail!("Error deserializing strings dictionary. Value in string dict is not a string: {val}")
75 }
76 }
77 Ok(dict)
78}
79
80#[inline]
81fn get_v05_span(reader: &mut Reader<impl Buf>, dict: &[String]) -> anyhow::Result<pb::Span> {
82 let mut span: pb::Span = Default::default();
83 let span_size = rmp::decode::read_array_len(reader)
84 .map_err(|err| anyhow!("Error reading span size: {err}"))? as usize;
85 if span_size != SPAN_ELEMENT_COUNT {
86 anyhow::bail!("Expected an array of exactly 12 elements in a span, got {span_size}");
87 }
88 span.service = get_v05_string(reader, dict, "service")?;
90 span.name = get_v05_string(reader, dict, "name")?;
92 span.resource = get_v05_string(reader, dict, "resource")?;
94
95 match read_value(reader)? {
97 Value::Integer(i) => {
98 span.trace_id = i.as_u64().ok_or_else(|| {
99 anyhow!("Error reading span trace_id, value is not an integer: {i}")
100 })?;
101 }
102 val => anyhow::bail!("Error reading span trace_id, value is not an integer: {val}"),
103 };
104 match read_value(reader)? {
106 Value::Integer(i) => {
107 span.span_id = i.as_u64().ok_or_else(|| {
108 anyhow!("Error reading span span_id, value is not an integer: {i}")
109 })?;
110 }
111 val => anyhow::bail!("Error reading span span_id, value is not an integer: {val}"),
112 };
113 match read_value(reader)? {
115 Value::Integer(i) => {
116 span.parent_id = i.as_u64().ok_or_else(|| {
117 anyhow!("Error reading span parent_id, value is not an integer: {i}")
118 })?;
119 }
120 val => anyhow::bail!("Error reading span parent_id, value is not an integer: {val}"),
121 };
122 match read_value(reader)? {
124 Value::Integer(i) => {
125 span.start = i
126 .as_i64()
127 .ok_or_else(|| anyhow!("Error reading span start, value is not an integer: {i}"))?;
128 }
129 val => anyhow::bail!("Error reading span start, value is not an integer: {val}"),
130 };
131 match read_value(reader)? {
133 Value::Integer(i) => {
134 span.duration = i.as_i64().ok_or_else(|| {
135 anyhow!("Error reading span duration, value is not an integer: {i}")
136 })?;
137 }
138 val => anyhow::bail!("Error reading span duration, value is not an integer: {val}"),
139 };
140 match read_value(reader)? {
142 Value::Integer(i) => {
143 span.error = i
144 .as_i64()
145 .ok_or_else(|| anyhow!("Error reading span error, value is not an integer: {i}"))?
146 as i32;
147 }
148 val => anyhow::bail!("Error reading span error, value is not an integer: {val}"),
149 }
150 match read_value(reader)? {
152 Value::Map(meta) => {
153 for (k, v) in meta.iter() {
154 match k {
155 Value::Integer(k) => {
156 match v {
157 Value::Integer(v) => {
158 let key = str_from_dict(dict, *k)?;
159 let val = str_from_dict(dict, *v)?;
160 span.meta.insert(key, val);
161 }
162 _ => anyhow::bail!("Error reading span meta, value is not an integer and can't be looked up in dict: {v}")
163 }
164 }
165 _ => anyhow::bail!("Error reading span meta, key is not an integer and can't be looked up in dict: {k}")
166 }
167 }
168 }
169 val => anyhow::bail!("Error reading span meta, value is not a map: {val}"),
170 }
171 match read_value(reader)? {
173 Value::Map(metrics) => {
174 for (k, v) in metrics.iter() {
175 match k {
176 Value::Integer(k) => {
177 match v {
178 Value::Integer(v) => {
179 let key = str_from_dict(dict, *k)?;
180 span.metrics.insert(key, v.as_f64().ok_or_else(||anyhow!("Error reading span metrics, value is not an integer: {v}"))?);
181 }
182 Value::F64(v) => {
183 let key = str_from_dict(dict, *k)?;
184 span.metrics.insert(key, *v);
185 }
186 _ => anyhow::bail!(
187 "Error reading span metrics, value is not a float or integer: {v}"
188 ),
189 }
190 }
191 _ => anyhow::bail!("Error reading span metrics, key is not an integer: {k}"),
192 }
193 }
194 }
195 val => anyhow::bail!("Error reading span metrics, value is not a map: {val}"),
196 }
197
198 match read_value(reader)? {
200 Value::Integer(s) => span.r#type = str_from_dict(dict, s)?,
201 val => anyhow::bail!("Error reading span type, value is not an integer: {val}"),
202 }
203 Ok(span)
204}
205
206#[inline]
207fn str_from_dict(dict: &[String], id: Integer) -> anyhow::Result<String> {
208 let id = id
209 .as_i64()
210 .ok_or_else(|| anyhow!("Error reading string from dict, id is not an integer: {id}"))?
211 as usize;
212 if id >= dict.len() {
213 anyhow::bail!("Error reading string from dict, id out of bounds: {id}");
214 }
215 Ok(dict[id].to_string())
216}
217
218#[inline]
219fn get_v05_string(
220 reader: &mut Reader<impl Buf>,
221 dict: &[String],
222 field_name: &str,
223) -> anyhow::Result<String> {
224 match read_value(reader)? {
225 Value::Integer(s) => {
226 str_from_dict(dict, s)
227 },
228 val => anyhow::bail!("Error reading {field_name}, value is not an integer and can't be looked up in dict: {val}")
229 }
230}
231
232pub async fn get_v05_traces_from_request_body<B>(
233 body: B,
234) -> anyhow::Result<(usize, Vec<Vec<pb::Span>>)>
235where
236 B: http_body::Body,
237 B::Error: std::error::Error + Send + Sync + 'static,
238{
239 let buffer = body.collect().await?.aggregate();
240 let body_size = buffer.remaining();
241 let mut reader = buffer.reader();
242 let wrapper_size = read_array_len(&mut reader)?;
243 if wrapper_size != 2 {
244 anyhow::bail!("Expected an arrary of exactly 2 elements, got {wrapper_size}");
245 }
246
247 let dict = get_v05_strings_dict(&mut reader)?;
248
249 let traces_size = rmp::decode::read_array_len(&mut reader)?;
250 let mut traces: Vec<Vec<pb::Span>> = Default::default();
251
252 for _ in 0..traces_size {
253 let spans_size = rmp::decode::read_array_len(&mut reader)?;
254 let mut trace: Vec<pb::Span> = Default::default();
255
256 for _ in 0..spans_size {
257 let span = get_v05_span(&mut reader, &dict)?;
258 trace.push(span);
259 }
260 traces.push(trace);
261 }
262 Ok((body_size, traces))
263}
264
265#[derive(Default)]
267pub struct TracerPayloadTags {
268 pub env: String,
269 pub app_version: String,
270 pub hostname: String,
271 pub runtime_id: String,
272}
273
274fn search_trace_for_field(root: &pb::Span, trace: &[pb::Span], field: &str) -> Option<String> {
277 if let Some(v) = root.meta.get(field) {
278 if !v.is_empty() {
279 return Some(v.clone());
280 }
281 }
282 for span in trace {
283 if span.span_id == root.span_id {
284 continue;
285 }
286 if let Some(v) = span.meta.get(field) {
287 if !v.is_empty() {
288 return Some(v.clone());
289 }
290 }
291 }
292 None
293}
294
295pub(crate) fn construct_trace_chunk(trace: Vec<pb::Span>) -> pb::TraceChunk {
296 pb::TraceChunk {
297 priority: normalizer::SamplerPriority::None as i32,
298 origin: "".to_string(),
299 spans: trace,
300 tags: HashMap::new(),
301 dropped_trace: false,
302 }
303}
304
305pub(crate) fn construct_tracer_payload(
306 chunks: Vec<pb::TraceChunk>,
307 tracer_tags: &TracerHeaderTags,
308 tracer_payload_tags: TracerPayloadTags,
309) -> pb::TracerPayload {
310 pb::TracerPayload {
311 app_version: tracer_payload_tags.app_version,
312 language_name: tracer_tags.lang.to_string(),
313 container_id: tracer_tags.container_id.to_string(),
314 env: tracer_payload_tags.env,
315 runtime_id: tracer_payload_tags.runtime_id,
316 chunks,
317 hostname: tracer_payload_tags.hostname,
318 language_version: tracer_tags.lang_version.to_string(),
319 tags: HashMap::new(),
320 tracer_version: tracer_tags.tracer_version.to_string(),
321 }
322}
323
324pub(crate) fn cmp_send_data_payloads(a: &pb::TracerPayload, b: &pb::TracerPayload) -> Ordering {
325 a.tracer_version
326 .cmp(&b.tracer_version)
327 .then(a.language_version.cmp(&b.language_version))
328 .then(a.language_name.cmp(&b.language_name))
329 .then(a.hostname.cmp(&b.hostname))
330 .then(a.container_id.cmp(&b.container_id))
331 .then(a.runtime_id.cmp(&b.runtime_id))
332 .then(a.env.cmp(&b.env))
333 .then(a.app_version.cmp(&b.app_version))
334}
335
336pub fn coalesce_send_data(mut data: Vec<SendData>) -> Vec<SendData> {
337 data.sort_unstable_by(|a, b| {
340 a.get_target()
341 .url
342 .to_string()
343 .cmp(&b.get_target().url.to_string())
344 .then(a.get_target().test_token.cmp(&b.get_target().test_token))
345 });
346 data.dedup_by(|a, b| {
347 if a.get_target().url == b.get_target().url
348 && a.get_target().test_token == b.get_target().test_token
349 {
350 if a.size + b.size < MAX_PAYLOAD_SIZE / 2 {
355 b.tracer_payloads.append(&mut a.tracer_payloads);
357 b.size += a.size;
358 return true;
359 }
360 }
361 false
362 });
363 for send_data in data.iter_mut() {
366 send_data.tracer_payloads.merge();
367 }
368 data
369}
370
371pub fn get_root_span_index(trace: &[pb::Span]) -> anyhow::Result<usize> {
372 if trace.is_empty() {
373 anyhow::bail!("Cannot find root span index in an empty trace.");
374 }
375
376 for (i, span) in trace.iter().enumerate().rev() {
379 if span.parent_id == 0 {
380 return Ok(i);
381 }
382 }
383
384 let mut span_ids: HashSet<u64> = HashSet::with_capacity(trace.len());
385 for span in trace.iter() {
386 span_ids.insert(span.span_id);
387 }
388
389 let mut root_span_id = None;
390 for (i, span) in trace.iter().enumerate() {
391 if !span_ids.contains(&span.parent_id) {
393 if root_span_id.is_some() {
394 debug!(
395 trace_id = &trace[0].trace_id,
396 "trace has multiple root spans"
397 );
398 }
399 root_span_id = Some(i);
400 }
401 }
402 Ok(match root_span_id {
403 Some(i) => i,
404 None => {
405 debug!(
406 trace_id = &trace[0].trace_id,
407 "Could not find the root span for trace"
408 );
409 trace.len() - 1
410 }
411 })
412}
413
414pub fn compute_top_level_span(trace: &mut [pb::Span]) {
421 let mut span_id_to_service: HashMap<u64, String> = HashMap::new();
422 for span in trace.iter() {
423 span_id_to_service.insert(span.span_id, span.service.clone());
424 }
425 for span in trace.iter_mut() {
426 if span.parent_id == 0 {
427 set_top_level_span(span, true);
428 continue;
429 }
430 match span_id_to_service.get(&span.parent_id) {
431 Some(parent_span_service) => {
432 if !parent_span_service.eq(&span.service) {
433 set_top_level_span(span, true)
435 }
436 }
437 None => {
438 set_top_level_span(span, true)
440 }
441 }
442 }
443}
444
445pub fn has_top_level(span: &pb::Span) -> bool {
447 span.metrics
448 .get(TRACER_TOP_LEVEL_KEY)
449 .is_some_and(|v| *v == 1.0)
450 || span.metrics.get(TOP_LEVEL_KEY).is_some_and(|v| *v == 1.0)
451}
452
453fn set_top_level_span(span: &mut pb::Span, is_top_level: bool) {
454 if is_top_level {
455 span.metrics.insert(TOP_LEVEL_KEY.to_string(), 1.0);
456 } else {
457 span.metrics.remove(TOP_LEVEL_KEY);
458 }
459}
460
461pub fn set_serverless_root_span_tags(
462 span: &mut pb::Span,
463 app_name: Option<String>,
464 env_type: &EnvironmentType,
465) {
466 let origin_tag = match env_type {
467 EnvironmentType::CloudFunction => "cloudfunction",
468 EnvironmentType::AzureFunction => "azurefunction",
469 EnvironmentType::AzureSpringApp => "azurespringapp",
470 EnvironmentType::LambdaFunction => "lambda", };
472 span.meta
473 .insert("_dd.origin".to_string(), origin_tag.to_string());
474 span.meta
475 .insert("origin".to_string(), origin_tag.to_string());
476
477 if let Some(function_name) = app_name {
478 match env_type {
479 EnvironmentType::CloudFunction
480 | EnvironmentType::AzureFunction
481 | EnvironmentType::LambdaFunction => {
482 span.meta.insert("functionname".to_string(), function_name);
483 }
484 _ => {}
485 }
486 }
487}
488
489fn update_tracer_top_level(span: &mut pb::Span) {
490 if span.metrics.contains_key(TRACER_TOP_LEVEL_KEY) {
491 span.metrics.insert(TOP_LEVEL_KEY.to_string(), 1.0);
492 }
493}
494
495#[derive(Clone, Debug, Eq, PartialEq)]
496pub enum EnvironmentType {
497 CloudFunction,
498 AzureFunction,
499 AzureSpringApp,
500 LambdaFunction,
501}
502
503#[derive(Clone, Debug, Eq, PartialEq)]
504pub struct MiniAgentMetadata {
505 pub azure_spring_app_hostname: Option<String>,
506 pub azure_spring_app_name: Option<String>,
507 pub gcp_project_id: Option<String>,
508 pub gcp_region: Option<String>,
509 pub version: Option<String>,
510}
511
512impl Default for MiniAgentMetadata {
513 fn default() -> Self {
514 MiniAgentMetadata {
515 azure_spring_app_hostname: Default::default(),
516 azure_spring_app_name: Default::default(),
517 gcp_project_id: Default::default(),
518 gcp_region: Default::default(),
519 version: env::var("DD_SERVERLESS_COMPAT_VERSION").ok(),
520 }
521 }
522}
523
524pub fn enrich_span_with_mini_agent_metadata(
525 span: &mut pb::Span,
526 mini_agent_metadata: &MiniAgentMetadata,
527) {
528 if let Some(azure_spring_app_hostname) = &mini_agent_metadata.azure_spring_app_hostname {
529 span.meta.insert(
530 "asa.hostname".to_string(),
531 azure_spring_app_hostname.to_string(),
532 );
533 }
534 if let Some(azure_spring_app_name) = &mini_agent_metadata.azure_spring_app_name {
535 span.meta
536 .insert("asa.name".to_string(), azure_spring_app_name.to_string());
537 }
538 if let Some(serverless_compat_version) = &mini_agent_metadata.version {
539 span.meta.insert(
540 "_dd.serverless_compat_version".to_string(),
541 serverless_compat_version.to_string(),
542 );
543 }
544}
545
546pub fn enrich_span_with_google_cloud_function_metadata(
547 span: &mut pb::Span,
548 mini_agent_metadata: &MiniAgentMetadata,
549 function: Option<String>,
550) {
551 #[allow(clippy::todo)]
552 let Some(region) = &mini_agent_metadata.gcp_region
553 else {
554 todo!()
555 };
556 #[allow(clippy::todo)]
557 let Some(project) = &mini_agent_metadata.gcp_project_id
558 else {
559 todo!()
560 };
561
562 if let Some(function) = function {
563 if !region.is_empty() && !project.is_empty() {
564 let resource_name = format!(
565 "projects/{}/locations/{}/functions/{}",
566 project, region, function
567 );
568
569 span.meta
570 .insert("gcrfx.location".to_string(), region.to_string());
571 span.meta
572 .insert("gcrfx.project_id".to_string(), project.to_string());
573 span.meta
574 .insert("gcrfx.resource_name".to_string(), resource_name.to_string());
575 }
576 }
577}
578
579pub fn enrich_span_with_azure_function_metadata(span: &mut pb::Span) {
580 if span.name == "azure.apim" {
581 return;
582 }
583
584 if let Some(aas_metadata) = &*azure_app_services::AAS_METADATA_FUNCTION {
585 span.meta.extend(
586 aas_metadata
587 .get_function_tags()
588 .map(|(name, value)| (name.to_string(), value.to_string())),
589 );
590 }
591}
592
593pub fn collect_trace_chunks<T: TraceData>(
594 traces: Vec<Vec<crate::span::v04::Span<T>>>,
595 use_v05_format: bool,
596) -> anyhow::Result<TraceChunks<T>> {
597 if use_v05_format {
598 let mut shared_dict = SharedDict::default();
599 let mut v05_traces: Vec<Vec<v05::Span>> = Vec::with_capacity(traces.len());
600 for trace in traces {
601 let trace_len = trace.len();
602 let v05_trace = trace.into_iter().try_fold(
603 Vec::with_capacity(trace_len),
604 |mut acc, span| -> anyhow::Result<Vec<v05::Span>> {
605 acc.push(v05::from_v04_span(span, &mut shared_dict)?);
606 Ok(acc)
607 },
608 )?;
609
610 v05_traces.push(v05_trace);
611 }
612 Ok(TraceChunks::V05((shared_dict, v05_traces)))
613 } else {
614 Ok(TraceChunks::V04(traces))
615 }
616}
617
618pub fn collect_pb_trace_chunks<T: tracer_payload::TraceChunkProcessor>(
619 mut traces: Vec<Vec<pb::Span>>,
620 tracer_header_tags: &TracerHeaderTags,
621 process_chunk: &mut T,
622 is_agentless: bool,
623) -> anyhow::Result<TracerPayloadCollection> {
624 let mut trace_chunks: Vec<pb::TraceChunk> = Vec::new();
625
626 let mut tracer_payload_tags = TracerPayloadTags::default();
628
629 for trace in traces.iter_mut() {
630 if is_agentless {
631 if let Err(e) = normalizer::normalize_trace(trace) {
632 error!("Error normalizing trace: {e}");
633 }
634 }
635
636 let mut chunk = construct_trace_chunk(trace.to_vec());
637
638 let root_span_index = match get_root_span_index(trace) {
639 Ok(res) => res,
640 Err(e) => {
641 error!("Error getting the root span index of a trace, skipping. {e}");
642 continue;
643 }
644 };
645
646 if let Err(e) = normalizer::normalize_chunk(&mut chunk, root_span_index) {
647 error!("Error normalizing trace chunk: {e}");
648 }
649
650 for span in chunk.spans.iter_mut() {
651 if tracer_header_tags.client_computed_top_level {
653 update_tracer_top_level(span);
654 }
655 }
656
657 if !tracer_header_tags.client_computed_top_level {
658 compute_top_level_span(&mut chunk.spans);
659 }
660
661 process_chunk.process(&mut chunk, root_span_index);
662
663 trace_chunks.push(chunk);
664
665 if is_agentless {
666 let root = &trace[root_span_index];
669 if tracer_payload_tags.env.is_empty() {
670 if let Some(mut v) = search_trace_for_field(root, trace, "env") {
671 libdd_trace_normalization::normalize_utils::normalize_tag(&mut v);
674 if !v.is_empty() {
675 tracer_payload_tags.env = v;
676 }
677 }
678 }
679 if tracer_payload_tags.app_version.is_empty() {
680 if let Some(v) = search_trace_for_field(root, trace, "version") {
681 tracer_payload_tags.app_version = v;
682 }
683 }
684 if tracer_payload_tags.hostname.is_empty() {
685 if let Some(v) = search_trace_for_field(root, trace, "_dd.hostname") {
686 tracer_payload_tags.hostname = v;
687 }
688 }
689 if tracer_payload_tags.runtime_id.is_empty() {
690 if let Some(v) = search_trace_for_field(root, trace, "runtime-id") {
691 tracer_payload_tags.runtime_id = v;
692 }
693 }
694 }
695 }
696
697 Ok(TracerPayloadCollection::V07(vec![
698 construct_tracer_payload(trace_chunks, tracer_header_tags, tracer_payload_tags),
699 ]))
700}
701
702pub fn is_measured(span: &pb::Span) -> bool {
704 span.metrics.get(MEASURED_KEY).is_some_and(|v| *v == 1.0)
705}
706
707pub fn is_partial_snapshot(span: &pb::Span) -> bool {
713 span.metrics
714 .get(PARTIAL_VERSION_KEY)
715 .is_some_and(|v| *v >= 0.0)
716}
717
718#[cfg(test)]
719mod tests {
720 use super::*;
721 use crate::{
722 span::SharedDictBytes,
723 test_utils::{create_test_no_alloc_span, create_test_span},
724 };
725 use http::Request;
726 use libdd_common::{http_common, Endpoint};
727 use serde_json::json;
728
729 fn find_index_in_dict(dict: &SharedDictBytes, value: &str) -> Option<u32> {
730 let idx = dict.iter().position(|e| e.as_str() == value);
731 idx.map(|idx| idx.try_into().unwrap())
732 }
733
734 #[test]
735 fn test_coalescing_does_not_exceed_max_size() {
736 fn dummy() -> SendData {
737 SendData::new(
738 MAX_PAYLOAD_SIZE / 5 + 1,
739 TracerPayloadCollection::V07(vec![pb::TracerPayload {
740 container_id: "".to_string(),
741 language_name: "".to_string(),
742 language_version: "".to_string(),
743 tracer_version: "".to_string(),
744 runtime_id: "".to_string(),
745 chunks: vec![pb::TraceChunk {
746 priority: 0,
747 origin: "".to_string(),
748 spans: vec![],
749 tags: Default::default(),
750 dropped_trace: false,
751 }],
752 tags: Default::default(),
753 env: "".to_string(),
754 hostname: "".to_string(),
755 app_version: "".to_string(),
756 }]),
757 TracerHeaderTags::default(),
758 &Endpoint::default(),
759 )
760 }
761 let coalesced = coalesce_send_data(vec![dummy(), dummy(), dummy(), dummy(), dummy()]);
762 assert_eq!(
763 5,
764 coalesced
765 .iter()
766 .map(|s| s.tracer_payloads.size())
767 .sum::<usize>()
768 );
769 assert!(
771 coalesced
772 .iter()
773 .map(|s| {
774 if let TracerPayloadCollection::V07(collection) = &s.tracer_payloads {
775 collection.iter().map(|s| s.chunks.len()).max().unwrap()
776 } else {
777 0
778 }
779 })
780 .max()
781 .unwrap()
782 > 1
783 );
784 assert!(coalesced.len() > 1 && coalesced.len() < 5);
785 }
786
787 #[tokio::test]
788 #[allow(clippy::type_complexity)]
789 #[cfg_attr(all(miri, target_os = "macos"), ignore)]
790 async fn test_get_v05_traces_from_request_body() {
791 let data: (
792 Vec<String>,
793 Vec<
794 Vec<(
795 u8,
796 u8,
797 u8,
798 u64,
799 u64,
800 u64,
801 i64,
802 i64,
803 i32,
804 HashMap<u8, u8>,
805 HashMap<u8, f64>,
806 u8,
807 )>,
808 >,
809 ) = (
810 vec![
811 "baggage".to_string(),
812 "item".to_string(),
813 "elasticsearch.version".to_string(),
814 "7.0".to_string(),
815 "my-name".to_string(),
816 "X".to_string(),
817 "my-service".to_string(),
818 "my-resource".to_string(),
819 "_dd.sampling_rate_whatever".to_string(),
820 "value whatever".to_string(),
821 "sql".to_string(),
822 ],
823 vec![vec![(
824 6,
825 4,
826 7,
827 1,
828 2,
829 3,
830 123,
831 456,
832 1,
833 HashMap::from([(8, 9), (0, 1), (2, 3)]),
834 HashMap::from([(5, 1.2)]),
835 10,
836 )]],
837 );
838 let bytes = rmp_serde::to_vec(&data).unwrap();
839 let res = get_v05_traces_from_request_body(http_common::Body::from(bytes)).await;
840 assert!(res.is_ok());
841 let (_, traces) = res.unwrap();
842 let span = traces[0][0].clone();
843 let test_span = pb::Span {
844 service: "my-service".to_string(),
845 name: "my-name".to_string(),
846 resource: "my-resource".to_string(),
847 trace_id: 1,
848 span_id: 2,
849 parent_id: 3,
850 start: 123,
851 duration: 456,
852 error: 1,
853 meta: HashMap::from([
854 ("baggage".to_string(), "item".to_string()),
855 ("elasticsearch.version".to_string(), "7.0".to_string()),
856 (
857 "_dd.sampling_rate_whatever".to_string(),
858 "value whatever".to_string(),
859 ),
860 ]),
861 metrics: HashMap::from([("X".to_string(), 1.2)]),
862 meta_struct: HashMap::default(),
863 r#type: "sql".to_string(),
864 span_links: vec![],
865 span_events: vec![],
866 };
867 assert_eq!(span, test_span);
868 }
869
870 #[tokio::test]
871 #[cfg_attr(miri, ignore)]
872 async fn test_get_traces_from_request_body() {
873 let pairs = vec![
874 (
875 json!([{
876 "service": "test-service",
877 "name": "test-service-name",
878 "resource": "test-service-resource",
879 "trace_id": 111,
880 "span_id": 222,
881 "parent_id": 333,
882 "start": 1,
883 "duration": 5,
884 "error": 0,
885 "meta": {},
886 "metrics": {},
887 }]),
888 vec![vec![pb::Span {
889 service: "test-service".to_string(),
890 name: "test-service-name".to_string(),
891 resource: "test-service-resource".to_string(),
892 trace_id: 111,
893 span_id: 222,
894 parent_id: 333,
895 start: 1,
896 duration: 5,
897 error: 0,
898 meta: HashMap::new(),
899 metrics: HashMap::new(),
900 meta_struct: HashMap::new(),
901 r#type: "".to_string(),
902 span_links: vec![],
903 span_events: vec![],
904 }]],
905 ),
906 (
907 json!([{
908 "name": "test-service-name",
909 "resource": "test-service-resource",
910 "trace_id": 111,
911 "span_id": 222,
912 "start": 1,
913 "duration": 5,
914 "meta": {},
915 }]),
916 vec![vec![pb::Span {
917 service: "".to_string(),
918 name: "test-service-name".to_string(),
919 resource: "test-service-resource".to_string(),
920 trace_id: 111,
921 span_id: 222,
922 parent_id: 0,
923 start: 1,
924 duration: 5,
925 error: 0,
926 meta: HashMap::new(),
927 metrics: HashMap::new(),
928 meta_struct: HashMap::new(),
929 r#type: "".to_string(),
930 span_links: vec![],
931 span_events: vec![],
932 }]],
933 ),
934 ];
935
936 for (trace_input, output) in pairs {
937 let bytes = rmp_serde::to_vec(&vec![&trace_input]).unwrap();
938 let request = Request::builder()
939 .body(http_common::Body::from(bytes))
940 .unwrap();
941 let res = get_traces_from_request_body(request.into_body()).await;
942 assert!(res.is_ok());
943 assert_eq!(res.unwrap().1, output);
944 }
945 }
946
947 #[tokio::test]
948 #[cfg_attr(miri, ignore)]
949 async fn test_get_traces_from_request_body_with_span_links() {
950 let trace_input = json!([[{
951 "service": "test-service",
952 "name": "test-name",
953 "resource": "test-resource",
954 "trace_id": 111,
955 "span_id": 222,
956 "parent_id": 333,
957 "start": 1,
958 "duration": 5,
959 "error": 0,
960 "meta": {},
961 "metrics": {},
962 "span_links": [{
963 "trace_id": 999,
964 "span_id": 888,
965 "trace_id_high": 777,
966 "attributes": {"key": "value"},
967 "tracestate": "vendor=value"
968 }]
970 }]]);
971
972 let expected_output = vec![vec![pb::Span {
973 service: "test-service".to_string(),
974 name: "test-name".to_string(),
975 resource: "test-resource".to_string(),
976 trace_id: 111,
977 span_id: 222,
978 parent_id: 333,
979 start: 1,
980 duration: 5,
981 error: 0,
982 meta: HashMap::new(),
983 metrics: HashMap::new(),
984 meta_struct: HashMap::new(),
985 r#type: String::new(),
986 span_links: vec![pb::SpanLink {
987 trace_id: 999,
988 span_id: 888,
989 trace_id_high: 777,
990 attributes: HashMap::from([("key".to_string(), "value".to_string())]),
991 tracestate: "vendor=value".to_string(),
992 flags: 0, }],
994 span_events: vec![],
995 }]];
996
997 let bytes = rmp_serde::to_vec(&trace_input).unwrap();
998 let request = Request::builder()
999 .body(http_common::Body::from(bytes))
1000 .unwrap();
1001
1002 let res = get_traces_from_request_body(request.into_body()).await;
1003 assert!(res.is_ok(), "Failed to deserialize: {res:?}");
1004 assert_eq!(res.unwrap().1, expected_output);
1005 }
1006
1007 #[test]
1008 fn test_get_root_span_index_from_complete_trace() {
1009 let trace = vec![
1010 create_test_span(1234, 12341, 0, 1, false),
1011 create_test_span(1234, 12342, 12341, 1, false),
1012 create_test_span(1234, 12343, 12342, 1, false),
1013 ];
1014
1015 let root_span_index = get_root_span_index(&trace);
1016 assert!(root_span_index.is_ok());
1017 assert_eq!(root_span_index.unwrap(), 0);
1018 }
1019
1020 #[test]
1021 fn test_get_root_span_index_from_partial_trace() {
1022 let trace = vec![
1023 create_test_span(1234, 12342, 12341, 1, false),
1024 create_test_span(1234, 12341, 12340, 1, false), create_test_span(1234, 12343, 12342, 1, false),
1027 ];
1028
1029 let root_span_index = get_root_span_index(&trace);
1030 assert!(root_span_index.is_ok());
1031 assert_eq!(root_span_index.unwrap(), 1);
1032 }
1033
1034 #[test]
1035 fn test_set_serverless_root_span_tags_azure_function() {
1036 let mut span = create_test_span(1234, 12342, 12341, 1, false);
1037 set_serverless_root_span_tags(
1038 &mut span,
1039 Some("test_function".to_string()),
1040 &EnvironmentType::AzureFunction,
1041 );
1042 assert_eq!(
1043 span.meta,
1044 HashMap::from([
1045 (
1046 "runtime-id".to_string(),
1047 "test-runtime-id-value".to_string()
1048 ),
1049 ("_dd.origin".to_string(), "azurefunction".to_string()),
1050 ("origin".to_string(), "azurefunction".to_string()),
1051 ("functionname".to_string(), "test_function".to_string()),
1052 ("env".to_string(), "test-env".to_string()),
1053 ("service".to_string(), "test-service".to_string())
1054 ]),
1055 );
1056 }
1057
1058 #[test]
1059 fn test_set_serverless_root_span_tags_cloud_function() {
1060 let mut span = create_test_span(1234, 12342, 12341, 1, false);
1061 set_serverless_root_span_tags(
1062 &mut span,
1063 Some("test_function".to_string()),
1064 &EnvironmentType::CloudFunction,
1065 );
1066 assert_eq!(
1067 span.meta,
1068 HashMap::from([
1069 (
1070 "runtime-id".to_string(),
1071 "test-runtime-id-value".to_string()
1072 ),
1073 ("_dd.origin".to_string(), "cloudfunction".to_string()),
1074 ("origin".to_string(), "cloudfunction".to_string()),
1075 ("functionname".to_string(), "test_function".to_string()),
1076 ("env".to_string(), "test-env".to_string()),
1077 ("service".to_string(), "test-service".to_string())
1078 ]),
1079 );
1080 }
1081
1082 #[test]
1083 fn test_has_top_level() {
1084 let top_level_span = create_test_span(123, 1234, 12, 1, true);
1085 let not_top_level_span = create_test_span(123, 1234, 12, 1, false);
1086 assert!(has_top_level(&top_level_span));
1087 assert!(!has_top_level(¬_top_level_span));
1088 }
1089
1090 #[test]
1091 fn test_is_measured() {
1092 let mut measured_span = create_test_span(123, 1234, 12, 1, true);
1093 measured_span.metrics.insert(MEASURED_KEY.into(), 1.0);
1094 let not_measured_span = create_test_span(123, 1234, 12, 1, true);
1095 assert!(is_measured(&measured_span));
1096 assert!(!is_measured(¬_measured_span));
1097 }
1098
1099 #[test]
1100 fn test_compute_top_level() {
1101 let mut span_with_different_service = create_test_span(123, 5, 2, 1, false);
1102 span_with_different_service.service = "another_service".into();
1103 let mut trace = vec![
1104 create_test_span(123, 1, 0, 1, false),
1106 create_test_span(123, 2, 1, 1, false),
1108 create_test_span(123, 4, 3, 1, false),
1111 span_with_different_service,
1114 ];
1115
1116 compute_top_level_span(trace.as_mut_slice());
1117
1118 let spans_marked_as_top_level: Vec<u64> = trace
1119 .iter()
1120 .filter_map(|span| {
1121 if has_top_level(span) {
1122 Some(span.span_id)
1123 } else {
1124 None
1125 }
1126 })
1127 .collect();
1128 assert_eq!(spans_marked_as_top_level, [1, 4, 5])
1129 }
1130
1131 #[test]
1132 fn test_collect_trace_chunks_v05() {
1133 let chunk = vec![create_test_no_alloc_span(123, 456, 789, 1, true)];
1134
1135 let collection = collect_trace_chunks(vec![chunk], true).unwrap();
1136
1137 let (dict, traces) = match collection {
1138 TraceChunks::V05(payload) => payload,
1139 _ => panic!("Unexpected type"),
1140 };
1141
1142 assert_eq!(dict.len(), 16);
1143
1144 let span = &traces[0][0];
1145 assert_eq!(span.service, 1);
1146 assert_eq!(span.name, 2);
1147 assert_eq!(span.resource, 3);
1148 assert_eq!(span.trace_id, 123);
1149 assert_eq!(span.span_id, 456);
1150 assert_eq!(span.parent_id, 789);
1151 assert_eq!(span.start, 1);
1152 assert_eq!(span.error, 0);
1153 assert_eq!(span.error, 0);
1154 assert_eq!(span.r#type, 15);
1155 assert_eq!(
1156 *span
1157 .meta
1158 .get(&find_index_in_dict(&dict, "service").unwrap())
1159 .unwrap(),
1160 find_index_in_dict(&dict, "test-service").unwrap()
1161 );
1162 assert_eq!(
1163 *span
1164 .meta
1165 .get(&find_index_in_dict(&dict, "env").unwrap())
1166 .unwrap(),
1167 find_index_in_dict(&dict, "test-env").unwrap()
1168 );
1169 assert_eq!(
1170 *span
1171 .meta
1172 .get(&find_index_in_dict(&dict, "runtime-id").unwrap())
1173 .unwrap(),
1174 find_index_in_dict(&dict, "test-runtime-id-value").unwrap()
1175 );
1176 assert_eq!(
1177 *span
1178 .meta
1179 .get(&find_index_in_dict(&dict, "_dd.origin").unwrap())
1180 .unwrap(),
1181 find_index_in_dict(&dict, "cloudfunction").unwrap()
1182 );
1183 assert_eq!(
1184 *span
1185 .meta
1186 .get(&find_index_in_dict(&dict, "origin").unwrap())
1187 .unwrap(),
1188 find_index_in_dict(&dict, "cloudfunction").unwrap()
1189 );
1190 assert_eq!(
1191 *span
1192 .meta
1193 .get(&find_index_in_dict(&dict, "functionname").unwrap())
1194 .unwrap(),
1195 find_index_in_dict(&dict, "dummy_function_name").unwrap()
1196 );
1197 assert_eq!(
1198 *span
1199 .metrics
1200 .get(&find_index_in_dict(&dict, "_top_level").unwrap())
1201 .unwrap(),
1202 1.0
1203 );
1204 }
1205
1206 #[test]
1207 fn test_rmp_serde_deserialize_meta_with_null_values() {
1208 let span_json = json!({
1210 "service": "test-service",
1211 "name": "test_name",
1212 "resource": "test-resource",
1213 "trace_id": 1_u64,
1214 "span_id": 2_u64,
1215 "parent_id": 0_u64,
1216 "start": 0_i64,
1217 "duration": 5_i64,
1218 "error": 0_i32,
1219 "meta": {
1220 "service": "test-service",
1221 "env": "test-env",
1222 "runtime-id": "test-runtime-id-value",
1223 "problematic_key": null },
1225 "metrics": {},
1226 "type": "",
1227 "meta_struct": {},
1228 "span_links": [],
1229 "span_events": []
1230 });
1231
1232 let traces_json = vec![vec![span_json]];
1233 let encoded_data = rmp_serde::to_vec(&traces_json).unwrap();
1234 let traces: Vec<Vec<pb::Span>> = rmp_serde::from_read(&encoded_data[..])
1235 .expect("Failed to deserialize traces with null values in meta");
1236
1237 assert_eq!(1, traces.len());
1238 assert_eq!(1, traces[0].len());
1239 let decoded_span = &traces[0][0];
1240
1241 assert_eq!("test-service", decoded_span.service);
1242 assert_eq!("test_name", decoded_span.name);
1243 assert_eq!("test-resource", decoded_span.resource);
1244 assert_eq!("test-service", decoded_span.meta.get("service").unwrap());
1245 assert_eq!("test-env", decoded_span.meta.get("env").unwrap());
1246 assert_eq!(
1247 "test-runtime-id-value",
1248 decoded_span.meta.get("runtime-id").unwrap()
1249 );
1250 assert!(
1252 !decoded_span.meta.contains_key("problematic_key"),
1253 "Null value should be skipped, but key was present"
1254 );
1255 }
1256
1257 #[test]
1258 fn test_enrich_span_with_azure_function_metadata_adds_tags_for_non_apim() {
1259 let mut span = create_test_span(1234, 12342, 12341, 1, false);
1260 span.name = "azure.function".to_string();
1261
1262 enrich_span_with_azure_function_metadata(&mut span);
1263
1264 if azure_app_services::AAS_METADATA_FUNCTION.is_some() {
1268 assert!(span.meta.contains_key("aas.resource.id"));
1269 assert!(span.meta.contains_key("aas.environment.instance_id"));
1270 assert!(span.meta.contains_key("aas.environment.instance_name"));
1271 assert!(span.meta.contains_key("aas.subscription.id"));
1272 assert!(span.meta.contains_key("aas.environment.os"));
1273 assert!(span.meta.contains_key("aas.environment.runtime"));
1274 assert!(span.meta.contains_key("aas.environment.runtime_version"));
1275 assert!(span.meta.contains_key("aas.environment.function_runtime"));
1276 assert!(span.meta.contains_key("aas.resource.group"));
1277 assert!(span.meta.contains_key("aas.site.name"));
1278 assert!(span.meta.contains_key("aas.site.kind"));
1279 assert!(span.meta.contains_key("aas.site.type"));
1280 }
1281 }
1282
1283 #[test]
1284 fn test_enrich_span_with_azure_function_metadata_skips_azure_apim() {
1285 let mut span = create_test_span(1234, 12342, 12341, 1, false);
1286 span.name = "azure.apim".to_string();
1287
1288 enrich_span_with_azure_function_metadata(&mut span);
1289
1290 assert!(!span.meta.contains_key("aas.resource.id"));
1292 assert!(!span.meta.contains_key("aas.environment.instance_id"));
1293 assert!(!span.meta.contains_key("aas.environment.instance_name"));
1294 assert!(!span.meta.contains_key("aas.subscription.id"));
1295 assert!(!span.meta.contains_key("aas.environment.os"));
1296 assert!(!span.meta.contains_key("aas.environment.runtime"));
1297 assert!(!span.meta.contains_key("aas.environment.runtime_version"));
1298 assert!(!span.meta.contains_key("aas.environment.function_runtime"));
1299 assert!(!span.meta.contains_key("aas.resource.group"));
1300 assert!(!span.meta.contains_key("aas.site.name"));
1301 assert!(!span.meta.contains_key("aas.site.kind"));
1302 assert!(!span.meta.contains_key("aas.site.type"));
1303 }
1304
1305 #[test]
1306 fn test_collect_pb_trace_chunks_searches_multiple_root_spans_for_fields() {
1307 let mut first_root_span = create_test_span(1, 1, 0, 1, true);
1310 first_root_span.meta.remove("env");
1311 first_root_span.meta.remove("runtime-id");
1312
1313 let mut second_root_span = create_test_span(2, 3, 0, 1, true);
1314 second_root_span
1315 .meta
1316 .insert("version".to_string(), "1.2.3".to_string());
1317 second_root_span
1318 .meta
1319 .insert("env".to_string(), "prod".to_string());
1320 second_root_span
1321 .meta
1322 .insert("_dd.hostname".to_string(), "my-host".to_string());
1323 second_root_span
1324 .meta
1325 .insert("runtime-id".to_string(), "123".to_string());
1326
1327 let result = collect_pb_trace_chunks(
1328 vec![vec![first_root_span], vec![second_root_span]],
1329 &TracerHeaderTags::default(),
1330 &mut tracer_payload::DefaultTraceChunkProcessor,
1331 true,
1332 )
1333 .unwrap();
1334
1335 let TracerPayloadCollection::V07(payloads) = result else {
1336 panic!("expected TracerPayloadCollection::V07");
1337 };
1338 assert_eq!(payloads[0].app_version, "1.2.3");
1339 assert_eq!(payloads[0].env, "prod");
1340 assert_eq!(payloads[0].hostname, "my-host");
1341 assert_eq!(payloads[0].runtime_id, "123");
1342 }
1343
1344 #[test]
1345 fn test_collect_pb_trace_chunks_searches_non_root_spans_for_fields() {
1346 let mut root_span = create_test_span(1, 1, 0, 1, true);
1349 root_span.meta.remove("env");
1350 root_span.meta.remove("runtime-id");
1351 let mut child_span = create_test_span(1, 2, 1, 1, false);
1352 child_span
1353 .meta
1354 .insert("version".to_string(), "1.2.3".to_string());
1355 child_span
1356 .meta
1357 .insert("env".to_string(), "prod".to_string());
1358 child_span
1359 .meta
1360 .insert("_dd.hostname".to_string(), "my-host".to_string());
1361 child_span
1362 .meta
1363 .insert("runtime-id".to_string(), "123".to_string());
1364
1365 let result = collect_pb_trace_chunks(
1366 vec![vec![root_span, child_span]],
1367 &TracerHeaderTags::default(),
1368 &mut tracer_payload::DefaultTraceChunkProcessor,
1369 true,
1370 )
1371 .unwrap();
1372
1373 let TracerPayloadCollection::V07(payloads) = result else {
1374 panic!("expected TracerPayloadCollection::V07");
1375 };
1376 assert_eq!(payloads[0].app_version, "1.2.3");
1377 assert_eq!(payloads[0].env, "prod");
1378 assert_eq!(payloads[0].hostname, "my-host");
1379 assert_eq!(payloads[0].runtime_id, "123");
1380 }
1381
1382 #[test]
1383 fn test_collect_pb_trace_chunks_root_span_takes_priority_over_child() {
1384 let mut root_span = create_test_span(1, 1, 0, 1, true);
1387 root_span
1388 .meta
1389 .insert("version".to_string(), "root-version".to_string());
1390 root_span
1391 .meta
1392 .insert("env".to_string(), "root-env".to_string());
1393 root_span
1394 .meta
1395 .insert("_dd.hostname".to_string(), "root-host".to_string());
1396 root_span
1397 .meta
1398 .insert("runtime-id".to_string(), "root-runtime-id".to_string());
1399
1400 let mut child_span = create_test_span(1, 2, 1, 1, false);
1401 child_span
1402 .meta
1403 .insert("version".to_string(), "child-version".to_string());
1404 child_span
1405 .meta
1406 .insert("env".to_string(), "child-env".to_string());
1407 child_span
1408 .meta
1409 .insert("_dd.hostname".to_string(), "child-host".to_string());
1410 child_span
1411 .meta
1412 .insert("runtime-id".to_string(), "child-runtime-id".to_string());
1413
1414 let result = collect_pb_trace_chunks(
1415 vec![vec![root_span, child_span]],
1416 &TracerHeaderTags::default(),
1417 &mut tracer_payload::DefaultTraceChunkProcessor,
1418 true,
1419 )
1420 .unwrap();
1421
1422 let TracerPayloadCollection::V07(payloads) = result else {
1423 panic!("expected TracerPayloadCollection::V07");
1424 };
1425 assert_eq!(payloads[0].app_version, "root-version");
1426 assert_eq!(payloads[0].env, "root-env");
1427 assert_eq!(payloads[0].hostname, "root-host");
1428 assert_eq!(payloads[0].runtime_id, "root-runtime-id");
1429 }
1430
1431 #[test]
1432 fn test_collect_pb_trace_chunks_skips_empty_root_span_value() {
1433 let mut root_span = create_test_span(1, 1, 0, 1, true);
1436 root_span.meta.insert("version".to_string(), "".to_string());
1437 root_span.meta.insert("env".to_string(), "".to_string());
1438 root_span
1439 .meta
1440 .insert("_dd.hostname".to_string(), "".to_string());
1441 root_span
1442 .meta
1443 .insert("runtime-id".to_string(), "".to_string());
1444
1445 let mut child_span = create_test_span(1, 2, 1, 1, false);
1446 child_span
1447 .meta
1448 .insert("version".to_string(), "1.2.3".to_string());
1449 child_span
1450 .meta
1451 .insert("env".to_string(), "prod".to_string());
1452 child_span
1453 .meta
1454 .insert("_dd.hostname".to_string(), "my-host".to_string());
1455 child_span
1456 .meta
1457 .insert("runtime-id".to_string(), "123".to_string());
1458
1459 let result = collect_pb_trace_chunks(
1460 vec![vec![root_span, child_span]],
1461 &TracerHeaderTags::default(),
1462 &mut tracer_payload::DefaultTraceChunkProcessor,
1463 true,
1464 )
1465 .unwrap();
1466
1467 let TracerPayloadCollection::V07(payloads) = result else {
1468 panic!("expected TracerPayloadCollection::V07");
1469 };
1470 assert_eq!(payloads[0].app_version, "1.2.3");
1471 assert_eq!(payloads[0].env, "prod");
1472 assert_eq!(payloads[0].hostname, "my-host");
1473 assert_eq!(payloads[0].runtime_id, "123");
1474 }
1475
1476 #[test]
1477 fn test_collect_pb_trace_chunks_normalizes_env() {
1478 let mut root = create_test_span(1, 1, 0, 1, true);
1479 root.meta
1480 .insert("env".to_string(), "PRODUCTION".to_string());
1481
1482 let result = collect_pb_trace_chunks(
1483 vec![vec![root]],
1484 &TracerHeaderTags::default(),
1485 &mut tracer_payload::DefaultTraceChunkProcessor,
1486 true,
1487 )
1488 .unwrap();
1489
1490 let TracerPayloadCollection::V07(payloads) = result else {
1491 panic!("expected TracerPayloadCollection::V07");
1492 };
1493 assert_eq!(payloads[0].env, "production");
1494 }
1495
1496 #[test]
1497 fn test_collect_pb_trace_chunks_skips_env_empty_after_normalization() {
1498 let mut first_root_span = create_test_span(1, 1, 0, 1, true);
1501 first_root_span
1502 .meta
1503 .insert("env".to_string(), "!!!".to_string());
1504
1505 let mut second_root_span = create_test_span(2, 3, 0, 1, true);
1506 second_root_span
1507 .meta
1508 .insert("env".to_string(), "prod".to_string());
1509
1510 let result = collect_pb_trace_chunks(
1511 vec![vec![first_root_span], vec![second_root_span]],
1512 &TracerHeaderTags::default(),
1513 &mut tracer_payload::DefaultTraceChunkProcessor,
1514 true,
1515 )
1516 .unwrap();
1517
1518 let TracerPayloadCollection::V07(payloads) = result else {
1519 panic!("expected TracerPayloadCollection::V07");
1520 };
1521 assert_eq!(payloads[0].env, "prod");
1522 }
1523
1524 #[test]
1525 fn test_search_trace_for_field_skips_span_with_same_id_as_root() {
1526 let mut root = create_test_span(1, 1, 0, 1, true);
1529 root.meta.remove("version");
1530
1531 let mut duplicate = create_test_span(1, 1, 0, 1, false);
1533 duplicate
1534 .meta
1535 .insert("version".to_string(), "should-not-appear".to_string());
1536
1537 let trace = vec![root.clone(), duplicate];
1538 assert_eq!(search_trace_for_field(&root, &trace, "version"), None);
1539 }
1540}