1pub use crate::send_data::send_data_result::SendDataResult;
5pub use crate::send_data::SendData;
6use crate::span::v05::dict::SharedDict;
7use crate::span::{v05, TraceData};
8pub use crate::tracer_header_tags::TracerHeaderTags;
9use crate::tracer_payload::TracerPayloadCollection;
10use crate::tracer_payload::{self, TraceChunks, TraceEncoding};
11use anyhow::anyhow;
12use bytes::buf::Reader;
13use bytes::Buf;
14use http_body_util::BodyExt;
15use libdd_common::azure_app_services;
16use libdd_trace_normalization::normalizer;
17use libdd_trace_protobuf::pb;
18use rmp::decode::read_array_len;
19use rmpv::decode::read_value;
20use rmpv::{Integer, Value};
21use std::cmp::Ordering;
22use std::collections::{HashMap, HashSet};
23use std::env;
24use tracing::{debug, error};
25
26pub const MAX_PAYLOAD_SIZE: usize = 25 * 1024 * 1024;
30const TOP_LEVEL_KEY: &str = "_top_level";
32const TRACER_TOP_LEVEL_KEY: &str = "_dd.top_level";
34const MEASURED_KEY: &str = "_dd.measured";
35const PARTIAL_VERSION_KEY: &str = "_dd.partial_version";
36const MAX_STRING_DICT_SIZE: u32 = 25_000_000;
37const SPAN_ELEMENT_COUNT: usize = 12;
38
39pub async fn get_traces_from_request_body<B>(body: B) -> anyhow::Result<(usize, Vec<Vec<pb::Span>>)>
41where
42 B: http_body::Body,
43 B::Error: std::error::Error + Send + Sync + 'static,
44{
45 let buffer = body.collect().await?.aggregate();
46 let size = buffer.remaining();
47
48 let traces: Vec<Vec<pb::Span>> = match rmp_serde::from_read(buffer.reader()) {
49 Ok(res) => res,
50 Err(err) => {
51 anyhow::bail!("Error deserializing trace from request body: {err}")
52 }
53 };
54
55 Ok((size, traces))
56}
57
58#[inline]
59fn get_v05_strings_dict(reader: &mut Reader<impl Buf>) -> anyhow::Result<Vec<String>> {
60 let dict_size =
61 read_array_len(reader).map_err(|err| anyhow!("Error reading dict size: {err}"))?;
62 if dict_size > MAX_STRING_DICT_SIZE {
63 anyhow::bail!(
64 "Error deserializing strings dictionary. Dict size is too large: {dict_size}"
65 );
66 }
67 let mut dict: Vec<String> = Vec::with_capacity(dict_size.try_into()?);
68 for _ in 0..dict_size {
69 match read_value(reader)? {
70 Value::String(s) => {
71 let parsed_string = s.into_str().ok_or_else(|| anyhow!("Error reading string dict"))?;
72 dict.push(parsed_string);
73 }
74 val => anyhow::bail!("Error deserializing strings dictionary. Value in string dict is not a string: {val}")
75 }
76 }
77 Ok(dict)
78}
79
80#[inline]
81fn get_v05_span(reader: &mut Reader<impl Buf>, dict: &[String]) -> anyhow::Result<pb::Span> {
82 let mut span: pb::Span = Default::default();
83 let span_size = rmp::decode::read_array_len(reader)
84 .map_err(|err| anyhow!("Error reading span size: {err}"))? as usize;
85 if span_size != SPAN_ELEMENT_COUNT {
86 anyhow::bail!("Expected an array of exactly 12 elements in a span, got {span_size}");
87 }
88 span.service = get_v05_string(reader, dict, "service")?;
90 span.name = get_v05_string(reader, dict, "name")?;
92 span.resource = get_v05_string(reader, dict, "resource")?;
94
95 match read_value(reader)? {
97 Value::Integer(i) => {
98 span.trace_id = i.as_u64().ok_or_else(|| {
99 anyhow!("Error reading span trace_id, value is not an integer: {i}")
100 })?;
101 }
102 val => anyhow::bail!("Error reading span trace_id, value is not an integer: {val}"),
103 };
104 match read_value(reader)? {
106 Value::Integer(i) => {
107 span.span_id = i.as_u64().ok_or_else(|| {
108 anyhow!("Error reading span span_id, value is not an integer: {i}")
109 })?;
110 }
111 val => anyhow::bail!("Error reading span span_id, value is not an integer: {val}"),
112 };
113 match read_value(reader)? {
115 Value::Integer(i) => {
116 span.parent_id = i.as_u64().ok_or_else(|| {
117 anyhow!("Error reading span parent_id, value is not an integer: {i}")
118 })?;
119 }
120 val => anyhow::bail!("Error reading span parent_id, value is not an integer: {val}"),
121 };
122 match read_value(reader)? {
124 Value::Integer(i) => {
125 span.start = i
126 .as_i64()
127 .ok_or_else(|| anyhow!("Error reading span start, value is not an integer: {i}"))?;
128 }
129 val => anyhow::bail!("Error reading span start, value is not an integer: {val}"),
130 };
131 match read_value(reader)? {
133 Value::Integer(i) => {
134 span.duration = i.as_i64().ok_or_else(|| {
135 anyhow!("Error reading span duration, value is not an integer: {i}")
136 })?;
137 }
138 val => anyhow::bail!("Error reading span duration, value is not an integer: {val}"),
139 };
140 match read_value(reader)? {
142 Value::Integer(i) => {
143 span.error = i
144 .as_i64()
145 .ok_or_else(|| anyhow!("Error reading span error, value is not an integer: {i}"))?
146 as i32;
147 }
148 val => anyhow::bail!("Error reading span error, value is not an integer: {val}"),
149 }
150 match read_value(reader)? {
152 Value::Map(meta) => {
153 for (k, v) in meta.iter() {
154 match k {
155 Value::Integer(k) => {
156 match v {
157 Value::Integer(v) => {
158 let key = str_from_dict(dict, *k)?;
159 let val = str_from_dict(dict, *v)?;
160 span.meta.insert(key, val);
161 }
162 _ => anyhow::bail!("Error reading span meta, value is not an integer and can't be looked up in dict: {v}")
163 }
164 }
165 _ => anyhow::bail!("Error reading span meta, key is not an integer and can't be looked up in dict: {k}")
166 }
167 }
168 }
169 val => anyhow::bail!("Error reading span meta, value is not a map: {val}"),
170 }
171 match read_value(reader)? {
173 Value::Map(metrics) => {
174 for (k, v) in metrics.iter() {
175 match k {
176 Value::Integer(k) => {
177 match v {
178 Value::Integer(v) => {
179 let key = str_from_dict(dict, *k)?;
180 span.metrics.insert(key, v.as_f64().ok_or_else(||anyhow!("Error reading span metrics, value is not an integer: {v}"))?);
181 }
182 Value::F64(v) => {
183 let key = str_from_dict(dict, *k)?;
184 span.metrics.insert(key, *v);
185 }
186 _ => anyhow::bail!(
187 "Error reading span metrics, value is not a float or integer: {v}"
188 ),
189 }
190 }
191 _ => anyhow::bail!("Error reading span metrics, key is not an integer: {k}"),
192 }
193 }
194 }
195 val => anyhow::bail!("Error reading span metrics, value is not a map: {val}"),
196 }
197
198 match read_value(reader)? {
200 Value::Integer(s) => span.r#type = str_from_dict(dict, s)?,
201 val => anyhow::bail!("Error reading span type, value is not an integer: {val}"),
202 }
203 Ok(span)
204}
205
206#[inline]
207fn str_from_dict(dict: &[String], id: Integer) -> anyhow::Result<String> {
208 let id = id
209 .as_i64()
210 .ok_or_else(|| anyhow!("Error reading string from dict, id is not an integer: {id}"))?
211 as usize;
212 if id >= dict.len() {
213 anyhow::bail!("Error reading string from dict, id out of bounds: {id}");
214 }
215 Ok(dict[id].to_string())
216}
217
218#[inline]
219fn get_v05_string(
220 reader: &mut Reader<impl Buf>,
221 dict: &[String],
222 field_name: &str,
223) -> anyhow::Result<String> {
224 match read_value(reader)? {
225 Value::Integer(s) => {
226 str_from_dict(dict, s)
227 },
228 val => anyhow::bail!("Error reading {field_name}, value is not an integer and can't be looked up in dict: {val}")
229 }
230}
231
232pub async fn get_v05_traces_from_request_body<B>(
233 body: B,
234) -> anyhow::Result<(usize, Vec<Vec<pb::Span>>)>
235where
236 B: http_body::Body,
237 B::Error: std::error::Error + Send + Sync + 'static,
238{
239 let buffer = body.collect().await?.aggregate();
240 let body_size = buffer.remaining();
241 let mut reader = buffer.reader();
242 let wrapper_size = read_array_len(&mut reader)?;
243 if wrapper_size != 2 {
244 anyhow::bail!("Expected an arrary of exactly 2 elements, got {wrapper_size}");
245 }
246
247 let dict = get_v05_strings_dict(&mut reader)?;
248
249 let traces_size = rmp::decode::read_array_len(&mut reader)?;
250 let mut traces: Vec<Vec<pb::Span>> = Default::default();
251
252 for _ in 0..traces_size {
253 let spans_size = rmp::decode::read_array_len(&mut reader)?;
254 let mut trace: Vec<pb::Span> = Default::default();
255
256 for _ in 0..spans_size {
257 let span = get_v05_span(&mut reader, &dict)?;
258 trace.push(span);
259 }
260 traces.push(trace);
261 }
262 Ok((body_size, traces))
263}
264
265#[derive(Default)]
267pub struct TracerPayloadTags {
268 pub env: String,
269 pub app_version: String,
270 pub hostname: String,
271 pub runtime_id: String,
272}
273
274fn search_trace_for_field(root: &pb::Span, trace: &[pb::Span], field: &str) -> Option<String> {
277 if let Some(v) = root.meta.get(field) {
278 if !v.is_empty() {
279 return Some(v.clone());
280 }
281 }
282 for span in trace {
283 if span.span_id == root.span_id {
284 continue;
285 }
286 if let Some(v) = span.meta.get(field) {
287 if !v.is_empty() {
288 return Some(v.clone());
289 }
290 }
291 }
292 None
293}
294
295pub(crate) fn construct_trace_chunk(trace: Vec<pb::Span>) -> pb::TraceChunk {
296 pb::TraceChunk {
297 priority: normalizer::SamplerPriority::None as i32,
298 origin: "".to_string(),
299 spans: trace,
300 tags: HashMap::new(),
301 dropped_trace: false,
302 }
303}
304
305pub(crate) fn construct_tracer_payload(
306 chunks: Vec<pb::TraceChunk>,
307 tracer_tags: &TracerHeaderTags,
308 tracer_payload_tags: TracerPayloadTags,
309) -> pb::TracerPayload {
310 pb::TracerPayload {
311 app_version: tracer_payload_tags.app_version,
312 language_name: tracer_tags.lang.to_string(),
313 container_id: tracer_tags.container_id.to_string(),
314 env: tracer_payload_tags.env,
315 runtime_id: tracer_payload_tags.runtime_id,
316 chunks,
317 hostname: tracer_payload_tags.hostname,
318 language_version: tracer_tags.lang_version.to_string(),
319 tags: HashMap::new(),
320 tracer_version: tracer_tags.tracer_version.to_string(),
321 }
322}
323
324pub(crate) fn cmp_send_data_payloads(a: &pb::TracerPayload, b: &pb::TracerPayload) -> Ordering {
325 a.tracer_version
326 .cmp(&b.tracer_version)
327 .then(a.language_version.cmp(&b.language_version))
328 .then(a.language_name.cmp(&b.language_name))
329 .then(a.hostname.cmp(&b.hostname))
330 .then(a.container_id.cmp(&b.container_id))
331 .then(a.runtime_id.cmp(&b.runtime_id))
332 .then(a.env.cmp(&b.env))
333 .then(a.app_version.cmp(&b.app_version))
334}
335
336pub fn coalesce_send_data(mut data: Vec<SendData>) -> Vec<SendData> {
337 data.sort_unstable_by(|a, b| {
340 a.get_target()
341 .url
342 .to_string()
343 .cmp(&b.get_target().url.to_string())
344 .then(a.get_target().test_token.cmp(&b.get_target().test_token))
345 });
346 data.dedup_by(|a, b| {
347 if a.get_target().url == b.get_target().url
348 && a.get_target().test_token == b.get_target().test_token
349 {
350 if a.size + b.size < MAX_PAYLOAD_SIZE / 2 {
355 b.tracer_payloads.append(&mut a.tracer_payloads);
357 b.size += a.size;
358 return true;
359 }
360 }
361 false
362 });
363 for send_data in data.iter_mut() {
366 send_data.tracer_payloads.merge();
367 }
368 data
369}
370
371pub fn get_root_span_index(trace: &[pb::Span]) -> anyhow::Result<usize> {
372 if trace.is_empty() {
373 anyhow::bail!("Cannot find root span index in an empty trace.");
374 }
375
376 for (i, span) in trace.iter().enumerate().rev() {
379 if span.parent_id == 0 {
380 return Ok(i);
381 }
382 }
383
384 let mut span_ids: HashSet<u64> = HashSet::with_capacity(trace.len());
385 for span in trace.iter() {
386 span_ids.insert(span.span_id);
387 }
388
389 let mut root_span_id = None;
390 for (i, span) in trace.iter().enumerate() {
391 if !span_ids.contains(&span.parent_id) {
393 if root_span_id.is_some() {
394 debug!(
395 trace_id = &trace[0].trace_id,
396 "trace has multiple root spans"
397 );
398 }
399 root_span_id = Some(i);
400 }
401 }
402 Ok(match root_span_id {
403 Some(i) => i,
404 None => {
405 debug!(
406 trace_id = &trace[0].trace_id,
407 "Could not find the root span for trace"
408 );
409 trace.len() - 1
410 }
411 })
412}
413
414pub fn compute_top_level_span(trace: &mut [pb::Span]) {
421 let mut span_id_to_service: HashMap<u64, String> = HashMap::new();
422 for span in trace.iter() {
423 span_id_to_service.insert(span.span_id, span.service.clone());
424 }
425 for span in trace.iter_mut() {
426 if span.parent_id == 0 {
427 set_top_level_span(span, true);
428 continue;
429 }
430 match span_id_to_service.get(&span.parent_id) {
431 Some(parent_span_service) => {
432 if !parent_span_service.eq(&span.service) {
433 set_top_level_span(span, true)
435 }
436 }
437 None => {
438 set_top_level_span(span, true)
440 }
441 }
442 }
443}
444
445pub fn has_top_level(span: &pb::Span) -> bool {
447 span.metrics
448 .get(TRACER_TOP_LEVEL_KEY)
449 .is_some_and(|v| *v == 1.0)
450 || span.metrics.get(TOP_LEVEL_KEY).is_some_and(|v| *v == 1.0)
451}
452
453fn set_top_level_span(span: &mut pb::Span, is_top_level: bool) {
454 if is_top_level {
455 span.metrics.insert(TOP_LEVEL_KEY.to_string(), 1.0);
456 } else {
457 span.metrics.remove(TOP_LEVEL_KEY);
458 }
459}
460
461pub fn set_serverless_root_span_tags(
462 span: &mut pb::Span,
463 app_name: Option<String>,
464 env_type: &EnvironmentType,
465) {
466 let origin_tag = match env_type {
467 EnvironmentType::CloudFunction => "cloudfunction",
468 EnvironmentType::AzureFunction => "azurefunction",
469 EnvironmentType::AzureSpringApp => "azurespringapp",
470 EnvironmentType::LambdaFunction => "lambda", };
472 span.meta
473 .insert("_dd.origin".to_string(), origin_tag.to_string());
474 span.meta
475 .insert("origin".to_string(), origin_tag.to_string());
476
477 if let Some(function_name) = app_name {
478 match env_type {
479 EnvironmentType::CloudFunction
480 | EnvironmentType::AzureFunction
481 | EnvironmentType::LambdaFunction => {
482 span.meta.insert("functionname".to_string(), function_name);
483 }
484 _ => {}
485 }
486 }
487}
488
489fn update_tracer_top_level(span: &mut pb::Span) {
490 if span.metrics.contains_key(TRACER_TOP_LEVEL_KEY) {
491 span.metrics.insert(TOP_LEVEL_KEY.to_string(), 1.0);
492 }
493}
494
495#[derive(Clone, Debug, Eq, PartialEq)]
496pub enum EnvironmentType {
497 CloudFunction,
498 AzureFunction,
499 AzureSpringApp,
500 LambdaFunction,
501}
502
503#[derive(Clone, Debug, Eq, PartialEq)]
504pub struct MiniAgentMetadata {
505 pub azure_spring_app_hostname: Option<String>,
506 pub azure_spring_app_name: Option<String>,
507 pub gcp_project_id: Option<String>,
508 pub gcp_region: Option<String>,
509 pub version: Option<String>,
510}
511
512impl Default for MiniAgentMetadata {
513 fn default() -> Self {
514 MiniAgentMetadata {
515 azure_spring_app_hostname: Default::default(),
516 azure_spring_app_name: Default::default(),
517 gcp_project_id: Default::default(),
518 gcp_region: Default::default(),
519 version: env::var("DD_SERVERLESS_COMPAT_VERSION").ok(),
520 }
521 }
522}
523
524pub fn enrich_span_with_mini_agent_metadata(
525 span: &mut pb::Span,
526 mini_agent_metadata: &MiniAgentMetadata,
527) {
528 if let Some(azure_spring_app_hostname) = &mini_agent_metadata.azure_spring_app_hostname {
529 span.meta.insert(
530 "asa.hostname".to_string(),
531 azure_spring_app_hostname.to_string(),
532 );
533 }
534 if let Some(azure_spring_app_name) = &mini_agent_metadata.azure_spring_app_name {
535 span.meta
536 .insert("asa.name".to_string(), azure_spring_app_name.to_string());
537 }
538 if let Some(serverless_compat_version) = &mini_agent_metadata.version {
539 span.meta.insert(
540 "_dd.serverless_compat_version".to_string(),
541 serverless_compat_version.to_string(),
542 );
543 }
544}
545
546pub fn enrich_span_with_google_cloud_function_metadata(
547 span: &mut pb::Span,
548 mini_agent_metadata: &MiniAgentMetadata,
549 function: Option<String>,
550) {
551 #[allow(clippy::todo)]
552 let Some(region) = &mini_agent_metadata.gcp_region
553 else {
554 todo!()
555 };
556 #[allow(clippy::todo)]
557 let Some(project) = &mini_agent_metadata.gcp_project_id
558 else {
559 todo!()
560 };
561
562 if let Some(function) = function {
563 if !region.is_empty() && !project.is_empty() {
564 let resource_name = format!(
565 "projects/{}/locations/{}/functions/{}",
566 project, region, function
567 );
568
569 span.meta
570 .insert("gcrfx.location".to_string(), region.to_string());
571 span.meta
572 .insert("gcrfx.project_id".to_string(), project.to_string());
573 span.meta
574 .insert("gcrfx.resource_name".to_string(), resource_name.to_string());
575 }
576 }
577}
578
579pub fn enrich_span_with_azure_function_metadata(span: &mut pb::Span) {
580 if span.name == "azure.apim" {
581 return;
582 }
583
584 if let Some(aas_metadata) = &*azure_app_services::AAS_METADATA_FUNCTION {
585 span.meta.extend(
586 aas_metadata
587 .get_function_tags()
588 .map(|(name, value)| (name.to_string(), value.to_string())),
589 );
590 }
591}
592
593pub fn collect_trace_chunks<T: TraceData>(
594 traces: Vec<Vec<crate::span::v04::Span<T>>>,
595 format: TraceEncoding,
596) -> anyhow::Result<TraceChunks<T>> {
597 match format {
598 TraceEncoding::V05 => {
599 let mut shared_dict = SharedDict::default();
600 let mut v05_traces: Vec<Vec<v05::Span>> = Vec::with_capacity(traces.len());
601 for trace in traces {
602 let v05_trace = trace
603 .into_iter()
604 .map(|span| v05::from_v04_span(span, &mut shared_dict))
605 .collect::<anyhow::Result<Vec<_>>>()?;
606 v05_traces.push(v05_trace);
607 }
608 Ok(TraceChunks::V05((shared_dict, v05_traces)))
609 }
610 TraceEncoding::V04 => Ok(TraceChunks::V04(traces)),
611 }
612}
613
614pub fn collect_pb_trace_chunks<T: tracer_payload::TraceChunkProcessor>(
615 mut traces: Vec<Vec<pb::Span>>,
616 tracer_header_tags: &TracerHeaderTags,
617 process_chunk: &mut T,
618 is_agentless: bool,
619) -> anyhow::Result<TracerPayloadCollection> {
620 let mut trace_chunks: Vec<pb::TraceChunk> = Vec::new();
621
622 let mut tracer_payload_tags = TracerPayloadTags::default();
624
625 for trace in traces.iter_mut() {
626 if is_agentless {
627 if let Err(e) = normalizer::normalize_trace(trace) {
628 error!("Error normalizing trace: {e}");
629 }
630 }
631
632 let mut chunk = construct_trace_chunk(trace.to_vec());
633
634 let root_span_index = match get_root_span_index(trace) {
635 Ok(res) => res,
636 Err(e) => {
637 error!("Error getting the root span index of a trace, skipping. {e}");
638 continue;
639 }
640 };
641
642 if let Err(e) = normalizer::normalize_chunk(&mut chunk, root_span_index) {
643 error!("Error normalizing trace chunk: {e}");
644 }
645
646 for span in chunk.spans.iter_mut() {
647 if tracer_header_tags.client_computed_top_level {
649 update_tracer_top_level(span);
650 }
651 }
652
653 if !tracer_header_tags.client_computed_top_level {
654 compute_top_level_span(&mut chunk.spans);
655 }
656
657 process_chunk.process(&mut chunk, root_span_index);
658
659 trace_chunks.push(chunk);
660
661 if is_agentless {
662 let root = &trace[root_span_index];
665 if tracer_payload_tags.env.is_empty() {
666 if let Some(mut v) = search_trace_for_field(root, trace, "env") {
667 libdd_trace_normalization::normalize_utils::normalize_tag(&mut v);
670 if !v.is_empty() {
671 tracer_payload_tags.env = v;
672 }
673 }
674 }
675 if tracer_payload_tags.app_version.is_empty() {
676 if let Some(v) = search_trace_for_field(root, trace, "version") {
677 tracer_payload_tags.app_version = v;
678 }
679 }
680 if tracer_payload_tags.hostname.is_empty() {
681 if let Some(v) = search_trace_for_field(root, trace, "_dd.hostname") {
682 tracer_payload_tags.hostname = v;
683 }
684 }
685 if tracer_payload_tags.runtime_id.is_empty() {
686 if let Some(v) = search_trace_for_field(root, trace, "runtime-id") {
687 tracer_payload_tags.runtime_id = v;
688 }
689 }
690 }
691 }
692
693 Ok(TracerPayloadCollection::V07(vec![
694 construct_tracer_payload(trace_chunks, tracer_header_tags, tracer_payload_tags),
695 ]))
696}
697
698pub fn is_measured(span: &pb::Span) -> bool {
700 span.metrics.get(MEASURED_KEY).is_some_and(|v| *v == 1.0)
701}
702
703pub fn is_partial_snapshot(span: &pb::Span) -> bool {
709 span.metrics
710 .get(PARTIAL_VERSION_KEY)
711 .is_some_and(|v| *v >= 0.0)
712}
713
714#[cfg(test)]
715mod tests {
716 use super::*;
717 use crate::{
718 span::SharedDictBytes,
719 test_utils::{create_test_no_alloc_span, create_test_span},
720 };
721 use http::Request;
722 use libdd_common::{http_common, Endpoint};
723 use serde_json::json;
724
725 fn find_index_in_dict(dict: &SharedDictBytes, value: &str) -> Option<u32> {
726 let idx = dict.iter().position(|e| e.as_str() == value);
727 idx.map(|idx| idx.try_into().unwrap())
728 }
729
730 #[test]
731 fn test_coalescing_does_not_exceed_max_size() {
732 fn dummy() -> SendData {
733 SendData::new(
734 MAX_PAYLOAD_SIZE / 5 + 1,
735 TracerPayloadCollection::V07(vec![pb::TracerPayload {
736 container_id: "".to_string(),
737 language_name: "".to_string(),
738 language_version: "".to_string(),
739 tracer_version: "".to_string(),
740 runtime_id: "".to_string(),
741 chunks: vec![pb::TraceChunk {
742 priority: 0,
743 origin: "".to_string(),
744 spans: vec![],
745 tags: Default::default(),
746 dropped_trace: false,
747 }],
748 tags: Default::default(),
749 env: "".to_string(),
750 hostname: "".to_string(),
751 app_version: "".to_string(),
752 }]),
753 TracerHeaderTags::default(),
754 &Endpoint::default(),
755 )
756 }
757 let coalesced = coalesce_send_data(vec![dummy(), dummy(), dummy(), dummy(), dummy()]);
758 assert_eq!(
759 5,
760 coalesced
761 .iter()
762 .map(|s| s.tracer_payloads.size())
763 .sum::<usize>()
764 );
765 assert!(
767 coalesced
768 .iter()
769 .map(|s| {
770 if let TracerPayloadCollection::V07(collection) = &s.tracer_payloads {
771 collection.iter().map(|s| s.chunks.len()).max().unwrap()
772 } else {
773 0
774 }
775 })
776 .max()
777 .unwrap()
778 > 1
779 );
780 assert!(coalesced.len() > 1 && coalesced.len() < 5);
781 }
782
783 #[tokio::test]
784 #[allow(clippy::type_complexity)]
785 #[cfg_attr(all(miri, target_os = "macos"), ignore)]
786 async fn test_get_v05_traces_from_request_body() {
787 let data: (
788 Vec<String>,
789 Vec<
790 Vec<(
791 u8,
792 u8,
793 u8,
794 u64,
795 u64,
796 u64,
797 i64,
798 i64,
799 i32,
800 HashMap<u8, u8>,
801 HashMap<u8, f64>,
802 u8,
803 )>,
804 >,
805 ) = (
806 vec![
807 "baggage".to_string(),
808 "item".to_string(),
809 "elasticsearch.version".to_string(),
810 "7.0".to_string(),
811 "my-name".to_string(),
812 "X".to_string(),
813 "my-service".to_string(),
814 "my-resource".to_string(),
815 "_dd.sampling_rate_whatever".to_string(),
816 "value whatever".to_string(),
817 "sql".to_string(),
818 ],
819 vec![vec![(
820 6,
821 4,
822 7,
823 1,
824 2,
825 3,
826 123,
827 456,
828 1,
829 HashMap::from([(8, 9), (0, 1), (2, 3)]),
830 HashMap::from([(5, 1.2)]),
831 10,
832 )]],
833 );
834 let bytes = rmp_serde::to_vec(&data).unwrap();
835 let res = get_v05_traces_from_request_body(http_common::Body::from(bytes)).await;
836 assert!(res.is_ok());
837 let (_, traces) = res.unwrap();
838 let span = traces[0][0].clone();
839 let test_span = pb::Span {
840 service: "my-service".to_string(),
841 name: "my-name".to_string(),
842 resource: "my-resource".to_string(),
843 trace_id: 1,
844 span_id: 2,
845 parent_id: 3,
846 start: 123,
847 duration: 456,
848 error: 1,
849 meta: HashMap::from([
850 ("baggage".to_string(), "item".to_string()),
851 ("elasticsearch.version".to_string(), "7.0".to_string()),
852 (
853 "_dd.sampling_rate_whatever".to_string(),
854 "value whatever".to_string(),
855 ),
856 ]),
857 metrics: HashMap::from([("X".to_string(), 1.2)]),
858 meta_struct: HashMap::default(),
859 r#type: "sql".to_string(),
860 span_links: vec![],
861 span_events: vec![],
862 };
863 assert_eq!(span, test_span);
864 }
865
866 #[tokio::test]
867 #[cfg_attr(miri, ignore)]
868 async fn test_get_traces_from_request_body() {
869 let pairs = vec![
870 (
871 json!([{
872 "service": "test-service",
873 "name": "test-service-name",
874 "resource": "test-service-resource",
875 "trace_id": 111,
876 "span_id": 222,
877 "parent_id": 333,
878 "start": 1,
879 "duration": 5,
880 "error": 0,
881 "meta": {},
882 "metrics": {},
883 }]),
884 vec![vec![pb::Span {
885 service: "test-service".to_string(),
886 name: "test-service-name".to_string(),
887 resource: "test-service-resource".to_string(),
888 trace_id: 111,
889 span_id: 222,
890 parent_id: 333,
891 start: 1,
892 duration: 5,
893 error: 0,
894 meta: HashMap::new(),
895 metrics: HashMap::new(),
896 meta_struct: HashMap::new(),
897 r#type: "".to_string(),
898 span_links: vec![],
899 span_events: vec![],
900 }]],
901 ),
902 (
903 json!([{
904 "name": "test-service-name",
905 "resource": "test-service-resource",
906 "trace_id": 111,
907 "span_id": 222,
908 "start": 1,
909 "duration": 5,
910 "meta": {},
911 }]),
912 vec![vec![pb::Span {
913 service: "".to_string(),
914 name: "test-service-name".to_string(),
915 resource: "test-service-resource".to_string(),
916 trace_id: 111,
917 span_id: 222,
918 parent_id: 0,
919 start: 1,
920 duration: 5,
921 error: 0,
922 meta: HashMap::new(),
923 metrics: HashMap::new(),
924 meta_struct: HashMap::new(),
925 r#type: "".to_string(),
926 span_links: vec![],
927 span_events: vec![],
928 }]],
929 ),
930 ];
931
932 for (trace_input, output) in pairs {
933 let bytes = rmp_serde::to_vec(&vec![&trace_input]).unwrap();
934 let request = Request::builder()
935 .body(http_common::Body::from(bytes))
936 .unwrap();
937 let res = get_traces_from_request_body(request.into_body()).await;
938 assert!(res.is_ok());
939 assert_eq!(res.unwrap().1, output);
940 }
941 }
942
943 #[tokio::test]
944 #[cfg_attr(miri, ignore)]
945 async fn test_get_traces_from_request_body_with_span_links() {
946 let trace_input = json!([[{
947 "service": "test-service",
948 "name": "test-name",
949 "resource": "test-resource",
950 "trace_id": 111,
951 "span_id": 222,
952 "parent_id": 333,
953 "start": 1,
954 "duration": 5,
955 "error": 0,
956 "meta": {},
957 "metrics": {},
958 "span_links": [{
959 "trace_id": 999,
960 "span_id": 888,
961 "trace_id_high": 777,
962 "attributes": {"key": "value"},
963 "tracestate": "vendor=value"
964 }]
966 }]]);
967
968 let expected_output = vec![vec![pb::Span {
969 service: "test-service".to_string(),
970 name: "test-name".to_string(),
971 resource: "test-resource".to_string(),
972 trace_id: 111,
973 span_id: 222,
974 parent_id: 333,
975 start: 1,
976 duration: 5,
977 error: 0,
978 meta: HashMap::new(),
979 metrics: HashMap::new(),
980 meta_struct: HashMap::new(),
981 r#type: String::new(),
982 span_links: vec![pb::SpanLink {
983 trace_id: 999,
984 span_id: 888,
985 trace_id_high: 777,
986 attributes: HashMap::from([("key".to_string(), "value".to_string())]),
987 tracestate: "vendor=value".to_string(),
988 flags: 0, }],
990 span_events: vec![],
991 }]];
992
993 let bytes = rmp_serde::to_vec(&trace_input).unwrap();
994 let request = Request::builder()
995 .body(http_common::Body::from(bytes))
996 .unwrap();
997
998 let res = get_traces_from_request_body(request.into_body()).await;
999 assert!(res.is_ok(), "Failed to deserialize: {res:?}");
1000 assert_eq!(res.unwrap().1, expected_output);
1001 }
1002
1003 #[test]
1004 fn test_get_root_span_index_from_complete_trace() {
1005 let trace = vec![
1006 create_test_span(1234, 12341, 0, 1, false),
1007 create_test_span(1234, 12342, 12341, 1, false),
1008 create_test_span(1234, 12343, 12342, 1, false),
1009 ];
1010
1011 let root_span_index = get_root_span_index(&trace);
1012 assert!(root_span_index.is_ok());
1013 assert_eq!(root_span_index.unwrap(), 0);
1014 }
1015
1016 #[test]
1017 fn test_get_root_span_index_from_partial_trace() {
1018 let trace = vec![
1019 create_test_span(1234, 12342, 12341, 1, false),
1020 create_test_span(1234, 12341, 12340, 1, false), create_test_span(1234, 12343, 12342, 1, false),
1023 ];
1024
1025 let root_span_index = get_root_span_index(&trace);
1026 assert!(root_span_index.is_ok());
1027 assert_eq!(root_span_index.unwrap(), 1);
1028 }
1029
1030 #[test]
1031 fn test_set_serverless_root_span_tags_azure_function() {
1032 let mut span = create_test_span(1234, 12342, 12341, 1, false);
1033 set_serverless_root_span_tags(
1034 &mut span,
1035 Some("test_function".to_string()),
1036 &EnvironmentType::AzureFunction,
1037 );
1038 assert_eq!(
1039 span.meta,
1040 HashMap::from([
1041 (
1042 "runtime-id".to_string(),
1043 "test-runtime-id-value".to_string()
1044 ),
1045 ("_dd.origin".to_string(), "azurefunction".to_string()),
1046 ("origin".to_string(), "azurefunction".to_string()),
1047 ("functionname".to_string(), "test_function".to_string()),
1048 ("env".to_string(), "test-env".to_string()),
1049 ("service".to_string(), "test-service".to_string())
1050 ]),
1051 );
1052 }
1053
1054 #[test]
1055 fn test_set_serverless_root_span_tags_cloud_function() {
1056 let mut span = create_test_span(1234, 12342, 12341, 1, false);
1057 set_serverless_root_span_tags(
1058 &mut span,
1059 Some("test_function".to_string()),
1060 &EnvironmentType::CloudFunction,
1061 );
1062 assert_eq!(
1063 span.meta,
1064 HashMap::from([
1065 (
1066 "runtime-id".to_string(),
1067 "test-runtime-id-value".to_string()
1068 ),
1069 ("_dd.origin".to_string(), "cloudfunction".to_string()),
1070 ("origin".to_string(), "cloudfunction".to_string()),
1071 ("functionname".to_string(), "test_function".to_string()),
1072 ("env".to_string(), "test-env".to_string()),
1073 ("service".to_string(), "test-service".to_string())
1074 ]),
1075 );
1076 }
1077
1078 #[test]
1079 fn test_has_top_level() {
1080 let top_level_span = create_test_span(123, 1234, 12, 1, true);
1081 let not_top_level_span = create_test_span(123, 1234, 12, 1, false);
1082 assert!(has_top_level(&top_level_span));
1083 assert!(!has_top_level(¬_top_level_span));
1084 }
1085
1086 #[test]
1087 fn test_is_measured() {
1088 let mut measured_span = create_test_span(123, 1234, 12, 1, true);
1089 measured_span.metrics.insert(MEASURED_KEY.into(), 1.0);
1090 let not_measured_span = create_test_span(123, 1234, 12, 1, true);
1091 assert!(is_measured(&measured_span));
1092 assert!(!is_measured(¬_measured_span));
1093 }
1094
1095 #[test]
1096 fn test_compute_top_level() {
1097 let mut span_with_different_service = create_test_span(123, 5, 2, 1, false);
1098 span_with_different_service.service = "another_service".into();
1099 let mut trace = vec![
1100 create_test_span(123, 1, 0, 1, false),
1102 create_test_span(123, 2, 1, 1, false),
1104 create_test_span(123, 4, 3, 1, false),
1107 span_with_different_service,
1110 ];
1111
1112 compute_top_level_span(trace.as_mut_slice());
1113
1114 let spans_marked_as_top_level: Vec<u64> = trace
1115 .iter()
1116 .filter_map(|span| {
1117 if has_top_level(span) {
1118 Some(span.span_id)
1119 } else {
1120 None
1121 }
1122 })
1123 .collect();
1124 assert_eq!(spans_marked_as_top_level, [1, 4, 5])
1125 }
1126
1127 #[test]
1128 fn test_collect_trace_chunks_v05() {
1129 let chunk = vec![create_test_no_alloc_span(123, 456, 789, 1, true)];
1130
1131 let collection = collect_trace_chunks(vec![chunk], TraceEncoding::V05).unwrap();
1132
1133 let (dict, traces) = match collection {
1134 TraceChunks::V05(payload) => payload,
1135 _ => panic!("Unexpected type"),
1136 };
1137
1138 assert_eq!(dict.len(), 16);
1139
1140 let span = &traces[0][0];
1141 assert_eq!(span.service, 1);
1142 assert_eq!(span.name, 2);
1143 assert_eq!(span.resource, 3);
1144 assert_eq!(span.trace_id, 123);
1145 assert_eq!(span.span_id, 456);
1146 assert_eq!(span.parent_id, 789);
1147 assert_eq!(span.start, 1);
1148 assert_eq!(span.error, 0);
1149 assert_eq!(span.error, 0);
1150 assert_eq!(span.r#type, 15);
1151 assert_eq!(
1152 *span
1153 .meta
1154 .get(&find_index_in_dict(&dict, "service").unwrap())
1155 .unwrap(),
1156 find_index_in_dict(&dict, "test-service").unwrap()
1157 );
1158 assert_eq!(
1159 *span
1160 .meta
1161 .get(&find_index_in_dict(&dict, "env").unwrap())
1162 .unwrap(),
1163 find_index_in_dict(&dict, "test-env").unwrap()
1164 );
1165 assert_eq!(
1166 *span
1167 .meta
1168 .get(&find_index_in_dict(&dict, "runtime-id").unwrap())
1169 .unwrap(),
1170 find_index_in_dict(&dict, "test-runtime-id-value").unwrap()
1171 );
1172 assert_eq!(
1173 *span
1174 .meta
1175 .get(&find_index_in_dict(&dict, "_dd.origin").unwrap())
1176 .unwrap(),
1177 find_index_in_dict(&dict, "cloudfunction").unwrap()
1178 );
1179 assert_eq!(
1180 *span
1181 .meta
1182 .get(&find_index_in_dict(&dict, "origin").unwrap())
1183 .unwrap(),
1184 find_index_in_dict(&dict, "cloudfunction").unwrap()
1185 );
1186 assert_eq!(
1187 *span
1188 .meta
1189 .get(&find_index_in_dict(&dict, "functionname").unwrap())
1190 .unwrap(),
1191 find_index_in_dict(&dict, "dummy_function_name").unwrap()
1192 );
1193 assert_eq!(
1194 *span
1195 .metrics
1196 .get(&find_index_in_dict(&dict, "_top_level").unwrap())
1197 .unwrap(),
1198 1.0
1199 );
1200 }
1201
1202 #[test]
1203 fn test_collect_trace_chunks_v04() {
1204 let chunk = vec![create_test_no_alloc_span(123, 456, 789, 1, true)];
1205
1206 let collection = collect_trace_chunks(vec![chunk], TraceEncoding::V04).unwrap();
1207
1208 let traces = match collection {
1209 TraceChunks::V04(traces) => traces,
1210 _ => panic!("Unexpected type"),
1211 };
1212
1213 assert_eq!(traces.len(), 1);
1214 assert_eq!(traces[0].len(), 1);
1215 let span = &traces[0][0];
1216 assert_eq!(span.trace_id, 123);
1217 assert_eq!(span.span_id, 456);
1218 assert_eq!(span.parent_id, 789);
1219 assert_eq!(span.start, 1);
1220 assert_eq!(span.error, 0);
1221 }
1222
1223 #[test]
1224 fn test_rmp_serde_deserialize_meta_with_null_values() {
1225 let span_json = json!({
1227 "service": "test-service",
1228 "name": "test_name",
1229 "resource": "test-resource",
1230 "trace_id": 1_u64,
1231 "span_id": 2_u64,
1232 "parent_id": 0_u64,
1233 "start": 0_i64,
1234 "duration": 5_i64,
1235 "error": 0_i32,
1236 "meta": {
1237 "service": "test-service",
1238 "env": "test-env",
1239 "runtime-id": "test-runtime-id-value",
1240 "problematic_key": null },
1242 "metrics": {},
1243 "type": "",
1244 "meta_struct": {},
1245 "span_links": [],
1246 "span_events": []
1247 });
1248
1249 let traces_json = vec![vec![span_json]];
1250 let encoded_data = rmp_serde::to_vec(&traces_json).unwrap();
1251 let traces: Vec<Vec<pb::Span>> = rmp_serde::from_read(&encoded_data[..])
1252 .expect("Failed to deserialize traces with null values in meta");
1253
1254 assert_eq!(1, traces.len());
1255 assert_eq!(1, traces[0].len());
1256 let decoded_span = &traces[0][0];
1257
1258 assert_eq!("test-service", decoded_span.service);
1259 assert_eq!("test_name", decoded_span.name);
1260 assert_eq!("test-resource", decoded_span.resource);
1261 assert_eq!("test-service", decoded_span.meta.get("service").unwrap());
1262 assert_eq!("test-env", decoded_span.meta.get("env").unwrap());
1263 assert_eq!(
1264 "test-runtime-id-value",
1265 decoded_span.meta.get("runtime-id").unwrap()
1266 );
1267 assert!(
1269 !decoded_span.meta.contains_key("problematic_key"),
1270 "Null value should be skipped, but key was present"
1271 );
1272 }
1273
1274 #[test]
1275 fn test_enrich_span_with_azure_function_metadata_adds_tags_for_non_apim() {
1276 let mut span = create_test_span(1234, 12342, 12341, 1, false);
1277 span.name = "azure.function".to_string();
1278
1279 enrich_span_with_azure_function_metadata(&mut span);
1280
1281 if azure_app_services::AAS_METADATA_FUNCTION.is_some() {
1285 assert!(span.meta.contains_key("aas.resource.id"));
1286 assert!(span.meta.contains_key("aas.environment.instance_id"));
1287 assert!(span.meta.contains_key("aas.environment.instance_name"));
1288 assert!(span.meta.contains_key("aas.subscription.id"));
1289 assert!(span.meta.contains_key("aas.environment.os"));
1290 assert!(span.meta.contains_key("aas.environment.runtime"));
1291 assert!(span.meta.contains_key("aas.environment.runtime_version"));
1292 assert!(span.meta.contains_key("aas.environment.function_runtime"));
1293 assert!(span.meta.contains_key("aas.resource.group"));
1294 assert!(span.meta.contains_key("aas.site.name"));
1295 assert!(span.meta.contains_key("aas.site.kind"));
1296 assert!(span.meta.contains_key("aas.site.type"));
1297 }
1298 }
1299
1300 #[test]
1301 fn test_enrich_span_with_azure_function_metadata_skips_azure_apim() {
1302 let mut span = create_test_span(1234, 12342, 12341, 1, false);
1303 span.name = "azure.apim".to_string();
1304
1305 enrich_span_with_azure_function_metadata(&mut span);
1306
1307 assert!(!span.meta.contains_key("aas.resource.id"));
1309 assert!(!span.meta.contains_key("aas.environment.instance_id"));
1310 assert!(!span.meta.contains_key("aas.environment.instance_name"));
1311 assert!(!span.meta.contains_key("aas.subscription.id"));
1312 assert!(!span.meta.contains_key("aas.environment.os"));
1313 assert!(!span.meta.contains_key("aas.environment.runtime"));
1314 assert!(!span.meta.contains_key("aas.environment.runtime_version"));
1315 assert!(!span.meta.contains_key("aas.environment.function_runtime"));
1316 assert!(!span.meta.contains_key("aas.resource.group"));
1317 assert!(!span.meta.contains_key("aas.site.name"));
1318 assert!(!span.meta.contains_key("aas.site.kind"));
1319 assert!(!span.meta.contains_key("aas.site.type"));
1320 }
1321
1322 #[test]
1323 fn test_collect_pb_trace_chunks_searches_multiple_root_spans_for_fields() {
1324 let mut first_root_span = create_test_span(1, 1, 0, 1, true);
1327 first_root_span.meta.remove("env");
1328 first_root_span.meta.remove("runtime-id");
1329
1330 let mut second_root_span = create_test_span(2, 3, 0, 1, true);
1331 second_root_span
1332 .meta
1333 .insert("version".to_string(), "1.2.3".to_string());
1334 second_root_span
1335 .meta
1336 .insert("env".to_string(), "prod".to_string());
1337 second_root_span
1338 .meta
1339 .insert("_dd.hostname".to_string(), "my-host".to_string());
1340 second_root_span
1341 .meta
1342 .insert("runtime-id".to_string(), "123".to_string());
1343
1344 let result = collect_pb_trace_chunks(
1345 vec![vec![first_root_span], vec![second_root_span]],
1346 &TracerHeaderTags::default(),
1347 &mut tracer_payload::DefaultTraceChunkProcessor,
1348 true,
1349 )
1350 .unwrap();
1351
1352 let TracerPayloadCollection::V07(payloads) = result else {
1353 panic!("expected TracerPayloadCollection::V07");
1354 };
1355 assert_eq!(payloads[0].app_version, "1.2.3");
1356 assert_eq!(payloads[0].env, "prod");
1357 assert_eq!(payloads[0].hostname, "my-host");
1358 assert_eq!(payloads[0].runtime_id, "123");
1359 }
1360
1361 #[test]
1362 fn test_collect_pb_trace_chunks_searches_non_root_spans_for_fields() {
1363 let mut root_span = create_test_span(1, 1, 0, 1, true);
1366 root_span.meta.remove("env");
1367 root_span.meta.remove("runtime-id");
1368 let mut child_span = create_test_span(1, 2, 1, 1, false);
1369 child_span
1370 .meta
1371 .insert("version".to_string(), "1.2.3".to_string());
1372 child_span
1373 .meta
1374 .insert("env".to_string(), "prod".to_string());
1375 child_span
1376 .meta
1377 .insert("_dd.hostname".to_string(), "my-host".to_string());
1378 child_span
1379 .meta
1380 .insert("runtime-id".to_string(), "123".to_string());
1381
1382 let result = collect_pb_trace_chunks(
1383 vec![vec![root_span, child_span]],
1384 &TracerHeaderTags::default(),
1385 &mut tracer_payload::DefaultTraceChunkProcessor,
1386 true,
1387 )
1388 .unwrap();
1389
1390 let TracerPayloadCollection::V07(payloads) = result else {
1391 panic!("expected TracerPayloadCollection::V07");
1392 };
1393 assert_eq!(payloads[0].app_version, "1.2.3");
1394 assert_eq!(payloads[0].env, "prod");
1395 assert_eq!(payloads[0].hostname, "my-host");
1396 assert_eq!(payloads[0].runtime_id, "123");
1397 }
1398
1399 #[test]
1400 fn test_collect_pb_trace_chunks_root_span_takes_priority_over_child() {
1401 let mut root_span = create_test_span(1, 1, 0, 1, true);
1404 root_span
1405 .meta
1406 .insert("version".to_string(), "root-version".to_string());
1407 root_span
1408 .meta
1409 .insert("env".to_string(), "root-env".to_string());
1410 root_span
1411 .meta
1412 .insert("_dd.hostname".to_string(), "root-host".to_string());
1413 root_span
1414 .meta
1415 .insert("runtime-id".to_string(), "root-runtime-id".to_string());
1416
1417 let mut child_span = create_test_span(1, 2, 1, 1, false);
1418 child_span
1419 .meta
1420 .insert("version".to_string(), "child-version".to_string());
1421 child_span
1422 .meta
1423 .insert("env".to_string(), "child-env".to_string());
1424 child_span
1425 .meta
1426 .insert("_dd.hostname".to_string(), "child-host".to_string());
1427 child_span
1428 .meta
1429 .insert("runtime-id".to_string(), "child-runtime-id".to_string());
1430
1431 let result = collect_pb_trace_chunks(
1432 vec![vec![root_span, child_span]],
1433 &TracerHeaderTags::default(),
1434 &mut tracer_payload::DefaultTraceChunkProcessor,
1435 true,
1436 )
1437 .unwrap();
1438
1439 let TracerPayloadCollection::V07(payloads) = result else {
1440 panic!("expected TracerPayloadCollection::V07");
1441 };
1442 assert_eq!(payloads[0].app_version, "root-version");
1443 assert_eq!(payloads[0].env, "root-env");
1444 assert_eq!(payloads[0].hostname, "root-host");
1445 assert_eq!(payloads[0].runtime_id, "root-runtime-id");
1446 }
1447
1448 #[test]
1449 fn test_collect_pb_trace_chunks_skips_empty_root_span_value() {
1450 let mut root_span = create_test_span(1, 1, 0, 1, true);
1453 root_span.meta.insert("version".to_string(), "".to_string());
1454 root_span.meta.insert("env".to_string(), "".to_string());
1455 root_span
1456 .meta
1457 .insert("_dd.hostname".to_string(), "".to_string());
1458 root_span
1459 .meta
1460 .insert("runtime-id".to_string(), "".to_string());
1461
1462 let mut child_span = create_test_span(1, 2, 1, 1, false);
1463 child_span
1464 .meta
1465 .insert("version".to_string(), "1.2.3".to_string());
1466 child_span
1467 .meta
1468 .insert("env".to_string(), "prod".to_string());
1469 child_span
1470 .meta
1471 .insert("_dd.hostname".to_string(), "my-host".to_string());
1472 child_span
1473 .meta
1474 .insert("runtime-id".to_string(), "123".to_string());
1475
1476 let result = collect_pb_trace_chunks(
1477 vec![vec![root_span, child_span]],
1478 &TracerHeaderTags::default(),
1479 &mut tracer_payload::DefaultTraceChunkProcessor,
1480 true,
1481 )
1482 .unwrap();
1483
1484 let TracerPayloadCollection::V07(payloads) = result else {
1485 panic!("expected TracerPayloadCollection::V07");
1486 };
1487 assert_eq!(payloads[0].app_version, "1.2.3");
1488 assert_eq!(payloads[0].env, "prod");
1489 assert_eq!(payloads[0].hostname, "my-host");
1490 assert_eq!(payloads[0].runtime_id, "123");
1491 }
1492
1493 #[test]
1494 fn test_collect_pb_trace_chunks_normalizes_env() {
1495 let mut root = create_test_span(1, 1, 0, 1, true);
1496 root.meta
1497 .insert("env".to_string(), "PRODUCTION".to_string());
1498
1499 let result = collect_pb_trace_chunks(
1500 vec![vec![root]],
1501 &TracerHeaderTags::default(),
1502 &mut tracer_payload::DefaultTraceChunkProcessor,
1503 true,
1504 )
1505 .unwrap();
1506
1507 let TracerPayloadCollection::V07(payloads) = result else {
1508 panic!("expected TracerPayloadCollection::V07");
1509 };
1510 assert_eq!(payloads[0].env, "production");
1511 }
1512
1513 #[test]
1514 fn test_collect_pb_trace_chunks_skips_env_empty_after_normalization() {
1515 let mut first_root_span = create_test_span(1, 1, 0, 1, true);
1518 first_root_span
1519 .meta
1520 .insert("env".to_string(), "!!!".to_string());
1521
1522 let mut second_root_span = create_test_span(2, 3, 0, 1, true);
1523 second_root_span
1524 .meta
1525 .insert("env".to_string(), "prod".to_string());
1526
1527 let result = collect_pb_trace_chunks(
1528 vec![vec![first_root_span], vec![second_root_span]],
1529 &TracerHeaderTags::default(),
1530 &mut tracer_payload::DefaultTraceChunkProcessor,
1531 true,
1532 )
1533 .unwrap();
1534
1535 let TracerPayloadCollection::V07(payloads) = result else {
1536 panic!("expected TracerPayloadCollection::V07");
1537 };
1538 assert_eq!(payloads[0].env, "prod");
1539 }
1540
1541 #[test]
1542 fn test_search_trace_for_field_skips_span_with_same_id_as_root() {
1543 let mut root = create_test_span(1, 1, 0, 1, true);
1546 root.meta.remove("version");
1547
1548 let mut duplicate = create_test_span(1, 1, 0, 1, false);
1550 duplicate
1551 .meta
1552 .insert("version".to_string(), "should-not-appear".to_string());
1553
1554 let trace = vec![root.clone(), duplicate];
1555 assert_eq!(search_trace_for_field(&root, &trace, "version"), None);
1556 }
1557}