1pub use crate::send_data::send_data_result::SendDataResult;
5pub use crate::send_data::SendData;
6use crate::span::v05::dict::SharedDict;
7use crate::span::{v05, TraceData};
8pub use crate::tracer_header_tags::TracerHeaderTags;
9use crate::tracer_payload::TracerPayloadCollection;
10use crate::tracer_payload::{self, TraceChunks};
11use anyhow::anyhow;
12use bytes::buf::Reader;
13use bytes::Buf;
14use http_body_util::BodyExt;
15use libdd_common::azure_app_services;
16use libdd_trace_normalization::normalizer;
17use libdd_trace_protobuf::pb;
18use rmp::decode::read_array_len;
19use rmpv::decode::read_value;
20use rmpv::{Integer, Value};
21use std::cmp::Ordering;
22use std::collections::{HashMap, HashSet};
23use std::env;
24use tracing::{debug, error};
25
26pub const MAX_PAYLOAD_SIZE: usize = 25 * 1024 * 1024;
30const TOP_LEVEL_KEY: &str = "_top_level";
32const TRACER_TOP_LEVEL_KEY: &str = "_dd.top_level";
34const MEASURED_KEY: &str = "_dd.measured";
35const PARTIAL_VERSION_KEY: &str = "_dd.partial_version";
36const MAX_STRING_DICT_SIZE: u32 = 25_000_000;
37const SPAN_ELEMENT_COUNT: usize = 12;
38
39pub async fn get_traces_from_request_body<B>(body: B) -> anyhow::Result<(usize, Vec<Vec<pb::Span>>)>
41where
42 B: http_body::Body,
43 B::Error: std::error::Error + Send + Sync + 'static,
44{
45 let buffer = body.collect().await?.aggregate();
46 let size = buffer.remaining();
47
48 let traces: Vec<Vec<pb::Span>> = match rmp_serde::from_read(buffer.reader()) {
49 Ok(res) => res,
50 Err(err) => {
51 anyhow::bail!("Error deserializing trace from request body: {err}")
52 }
53 };
54
55 Ok((size, traces))
56}
57
58#[inline]
59fn get_v05_strings_dict(reader: &mut Reader<impl Buf>) -> anyhow::Result<Vec<String>> {
60 let dict_size =
61 read_array_len(reader).map_err(|err| anyhow!("Error reading dict size: {err}"))?;
62 if dict_size > MAX_STRING_DICT_SIZE {
63 anyhow::bail!(
64 "Error deserializing strings dictionary. Dict size is too large: {dict_size}"
65 );
66 }
67 let mut dict: Vec<String> = Vec::with_capacity(dict_size.try_into()?);
68 for _ in 0..dict_size {
69 match read_value(reader)? {
70 Value::String(s) => {
71 let parsed_string = s.into_str().ok_or_else(|| anyhow!("Error reading string dict"))?;
72 dict.push(parsed_string);
73 }
74 val => anyhow::bail!("Error deserializing strings dictionary. Value in string dict is not a string: {val}")
75 }
76 }
77 Ok(dict)
78}
79
80#[inline]
81fn get_v05_span(reader: &mut Reader<impl Buf>, dict: &[String]) -> anyhow::Result<pb::Span> {
82 let mut span: pb::Span = Default::default();
83 let span_size = rmp::decode::read_array_len(reader)
84 .map_err(|err| anyhow!("Error reading span size: {err}"))? as usize;
85 if span_size != SPAN_ELEMENT_COUNT {
86 anyhow::bail!("Expected an array of exactly 12 elements in a span, got {span_size}");
87 }
88 span.service = get_v05_string(reader, dict, "service")?;
90 span.name = get_v05_string(reader, dict, "name")?;
92 span.resource = get_v05_string(reader, dict, "resource")?;
94
95 match read_value(reader)? {
97 Value::Integer(i) => {
98 span.trace_id = i.as_u64().ok_or_else(|| {
99 anyhow!("Error reading span trace_id, value is not an integer: {i}")
100 })?;
101 }
102 val => anyhow::bail!("Error reading span trace_id, value is not an integer: {val}"),
103 };
104 match read_value(reader)? {
106 Value::Integer(i) => {
107 span.span_id = i.as_u64().ok_or_else(|| {
108 anyhow!("Error reading span span_id, value is not an integer: {i}")
109 })?;
110 }
111 val => anyhow::bail!("Error reading span span_id, value is not an integer: {val}"),
112 };
113 match read_value(reader)? {
115 Value::Integer(i) => {
116 span.parent_id = i.as_u64().ok_or_else(|| {
117 anyhow!("Error reading span parent_id, value is not an integer: {i}")
118 })?;
119 }
120 val => anyhow::bail!("Error reading span parent_id, value is not an integer: {val}"),
121 };
122 match read_value(reader)? {
124 Value::Integer(i) => {
125 span.start = i
126 .as_i64()
127 .ok_or_else(|| anyhow!("Error reading span start, value is not an integer: {i}"))?;
128 }
129 val => anyhow::bail!("Error reading span start, value is not an integer: {val}"),
130 };
131 match read_value(reader)? {
133 Value::Integer(i) => {
134 span.duration = i.as_i64().ok_or_else(|| {
135 anyhow!("Error reading span duration, value is not an integer: {i}")
136 })?;
137 }
138 val => anyhow::bail!("Error reading span duration, value is not an integer: {val}"),
139 };
140 match read_value(reader)? {
142 Value::Integer(i) => {
143 span.error = i
144 .as_i64()
145 .ok_or_else(|| anyhow!("Error reading span error, value is not an integer: {i}"))?
146 as i32;
147 }
148 val => anyhow::bail!("Error reading span error, value is not an integer: {val}"),
149 }
150 match read_value(reader)? {
152 Value::Map(meta) => {
153 for (k, v) in meta.iter() {
154 match k {
155 Value::Integer(k) => {
156 match v {
157 Value::Integer(v) => {
158 let key = str_from_dict(dict, *k)?;
159 let val = str_from_dict(dict, *v)?;
160 span.meta.insert(key, val);
161 }
162 _ => anyhow::bail!("Error reading span meta, value is not an integer and can't be looked up in dict: {v}")
163 }
164 }
165 _ => anyhow::bail!("Error reading span meta, key is not an integer and can't be looked up in dict: {k}")
166 }
167 }
168 }
169 val => anyhow::bail!("Error reading span meta, value is not a map: {val}"),
170 }
171 match read_value(reader)? {
173 Value::Map(metrics) => {
174 for (k, v) in metrics.iter() {
175 match k {
176 Value::Integer(k) => {
177 match v {
178 Value::Integer(v) => {
179 let key = str_from_dict(dict, *k)?;
180 span.metrics.insert(key, v.as_f64().ok_or_else(||anyhow!("Error reading span metrics, value is not an integer: {v}"))?);
181 }
182 Value::F64(v) => {
183 let key = str_from_dict(dict, *k)?;
184 span.metrics.insert(key, *v);
185 }
186 _ => anyhow::bail!(
187 "Error reading span metrics, value is not a float or integer: {v}"
188 ),
189 }
190 }
191 _ => anyhow::bail!("Error reading span metrics, key is not an integer: {k}"),
192 }
193 }
194 }
195 val => anyhow::bail!("Error reading span metrics, value is not a map: {val}"),
196 }
197
198 match read_value(reader)? {
200 Value::Integer(s) => span.r#type = str_from_dict(dict, s)?,
201 val => anyhow::bail!("Error reading span type, value is not an integer: {val}"),
202 }
203 Ok(span)
204}
205
206#[inline]
207fn str_from_dict(dict: &[String], id: Integer) -> anyhow::Result<String> {
208 let id = id
209 .as_i64()
210 .ok_or_else(|| anyhow!("Error reading string from dict, id is not an integer: {id}"))?
211 as usize;
212 if id >= dict.len() {
213 anyhow::bail!("Error reading string from dict, id out of bounds: {id}");
214 }
215 Ok(dict[id].to_string())
216}
217
218#[inline]
219fn get_v05_string(
220 reader: &mut Reader<impl Buf>,
221 dict: &[String],
222 field_name: &str,
223) -> anyhow::Result<String> {
224 match read_value(reader)? {
225 Value::Integer(s) => {
226 str_from_dict(dict, s)
227 },
228 val => anyhow::bail!("Error reading {field_name}, value is not an integer and can't be looked up in dict: {val}")
229 }
230}
231
232pub async fn get_v05_traces_from_request_body<B>(
233 body: B,
234) -> anyhow::Result<(usize, Vec<Vec<pb::Span>>)>
235where
236 B: http_body::Body,
237 B::Error: std::error::Error + Send + Sync + 'static,
238{
239 let buffer = body.collect().await?.aggregate();
240 let body_size = buffer.remaining();
241 let mut reader = buffer.reader();
242 let wrapper_size = read_array_len(&mut reader)?;
243 if wrapper_size != 2 {
244 anyhow::bail!("Expected an arrary of exactly 2 elements, got {wrapper_size}");
245 }
246
247 let dict = get_v05_strings_dict(&mut reader)?;
248
249 let traces_size = rmp::decode::read_array_len(&mut reader)?;
250 let mut traces: Vec<Vec<pb::Span>> = Default::default();
251
252 for _ in 0..traces_size {
253 let spans_size = rmp::decode::read_array_len(&mut reader)?;
254 let mut trace: Vec<pb::Span> = Default::default();
255
256 for _ in 0..spans_size {
257 let span = get_v05_span(&mut reader, &dict)?;
258 trace.push(span);
259 }
260 traces.push(trace);
261 }
262 Ok((body_size, traces))
263}
264
265#[derive(Default)]
267pub struct RootSpanTags<'a> {
268 pub env: &'a str,
269 pub app_version: &'a str,
270 pub hostname: &'a str,
271 pub runtime_id: &'a str,
272}
273
274pub(crate) fn construct_trace_chunk(trace: Vec<pb::Span>) -> pb::TraceChunk {
275 pb::TraceChunk {
276 priority: normalizer::SamplerPriority::None as i32,
277 origin: "".to_string(),
278 spans: trace,
279 tags: HashMap::new(),
280 dropped_trace: false,
281 }
282}
283
284pub(crate) fn construct_tracer_payload(
285 chunks: Vec<pb::TraceChunk>,
286 tracer_tags: &TracerHeaderTags,
287 root_span_tags: RootSpanTags,
288) -> pb::TracerPayload {
289 pb::TracerPayload {
290 app_version: root_span_tags.app_version.to_string(),
291 language_name: tracer_tags.lang.to_string(),
292 container_id: tracer_tags.container_id.to_string(),
293 env: root_span_tags.env.to_string(),
294 runtime_id: root_span_tags.runtime_id.to_string(),
295 chunks,
296 hostname: root_span_tags.hostname.to_string(),
297 language_version: tracer_tags.lang_version.to_string(),
298 tags: HashMap::new(),
299 tracer_version: tracer_tags.tracer_version.to_string(),
300 }
301}
302
303pub(crate) fn cmp_send_data_payloads(a: &pb::TracerPayload, b: &pb::TracerPayload) -> Ordering {
304 a.tracer_version
305 .cmp(&b.tracer_version)
306 .then(a.language_version.cmp(&b.language_version))
307 .then(a.language_name.cmp(&b.language_name))
308 .then(a.hostname.cmp(&b.hostname))
309 .then(a.container_id.cmp(&b.container_id))
310 .then(a.runtime_id.cmp(&b.runtime_id))
311 .then(a.env.cmp(&b.env))
312 .then(a.app_version.cmp(&b.app_version))
313}
314
315pub fn coalesce_send_data(mut data: Vec<SendData>) -> Vec<SendData> {
316 data.sort_unstable_by(|a, b| {
319 a.get_target()
320 .url
321 .to_string()
322 .cmp(&b.get_target().url.to_string())
323 .then(a.get_target().test_token.cmp(&b.get_target().test_token))
324 });
325 data.dedup_by(|a, b| {
326 if a.get_target().url == b.get_target().url
327 && a.get_target().test_token == b.get_target().test_token
328 {
329 if a.size + b.size < MAX_PAYLOAD_SIZE / 2 {
334 b.tracer_payloads.append(&mut a.tracer_payloads);
336 b.size += a.size;
337 return true;
338 }
339 }
340 false
341 });
342 for send_data in data.iter_mut() {
345 send_data.tracer_payloads.merge();
346 }
347 data
348}
349
350pub fn get_root_span_index(trace: &[pb::Span]) -> anyhow::Result<usize> {
351 if trace.is_empty() {
352 anyhow::bail!("Cannot find root span index in an empty trace.");
353 }
354
355 for (i, span) in trace.iter().enumerate().rev() {
358 if span.parent_id == 0 {
359 return Ok(i);
360 }
361 }
362
363 let mut span_ids: HashSet<u64> = HashSet::with_capacity(trace.len());
364 for span in trace.iter() {
365 span_ids.insert(span.span_id);
366 }
367
368 let mut root_span_id = None;
369 for (i, span) in trace.iter().enumerate() {
370 if !span_ids.contains(&span.parent_id) {
372 if root_span_id.is_some() {
373 debug!(
374 trace_id = &trace[0].trace_id,
375 "trace has multiple root spans"
376 );
377 }
378 root_span_id = Some(i);
379 }
380 }
381 Ok(match root_span_id {
382 Some(i) => i,
383 None => {
384 debug!(
385 trace_id = &trace[0].trace_id,
386 "Could not find the root span for trace"
387 );
388 trace.len() - 1
389 }
390 })
391}
392
393pub fn compute_top_level_span(trace: &mut [pb::Span]) {
400 let mut span_id_to_service: HashMap<u64, String> = HashMap::new();
401 for span in trace.iter() {
402 span_id_to_service.insert(span.span_id, span.service.clone());
403 }
404 for span in trace.iter_mut() {
405 if span.parent_id == 0 {
406 set_top_level_span(span, true);
407 continue;
408 }
409 match span_id_to_service.get(&span.parent_id) {
410 Some(parent_span_service) => {
411 if !parent_span_service.eq(&span.service) {
412 set_top_level_span(span, true)
414 }
415 }
416 None => {
417 set_top_level_span(span, true)
419 }
420 }
421 }
422}
423
424pub fn has_top_level(span: &pb::Span) -> bool {
426 span.metrics
427 .get(TRACER_TOP_LEVEL_KEY)
428 .is_some_and(|v| *v == 1.0)
429 || span.metrics.get(TOP_LEVEL_KEY).is_some_and(|v| *v == 1.0)
430}
431
432fn set_top_level_span(span: &mut pb::Span, is_top_level: bool) {
433 if is_top_level {
434 span.metrics.insert(TOP_LEVEL_KEY.to_string(), 1.0);
435 } else {
436 span.metrics.remove(TOP_LEVEL_KEY);
437 }
438}
439
440pub fn set_serverless_root_span_tags(
441 span: &mut pb::Span,
442 app_name: Option<String>,
443 env_type: &EnvironmentType,
444) {
445 let origin_tag = match env_type {
446 EnvironmentType::CloudFunction => "cloudfunction",
447 EnvironmentType::AzureFunction => "azurefunction",
448 EnvironmentType::AzureSpringApp => "azurespringapp",
449 EnvironmentType::LambdaFunction => "lambda", };
451 span.meta
452 .insert("_dd.origin".to_string(), origin_tag.to_string());
453 span.meta
454 .insert("origin".to_string(), origin_tag.to_string());
455
456 if let Some(function_name) = app_name {
457 match env_type {
458 EnvironmentType::CloudFunction
459 | EnvironmentType::AzureFunction
460 | EnvironmentType::LambdaFunction => {
461 span.meta.insert("functionname".to_string(), function_name);
462 }
463 _ => {}
464 }
465 }
466}
467
468fn update_tracer_top_level(span: &mut pb::Span) {
469 if span.metrics.contains_key(TRACER_TOP_LEVEL_KEY) {
470 span.metrics.insert(TOP_LEVEL_KEY.to_string(), 1.0);
471 }
472}
473
474#[derive(Clone, Debug, Eq, PartialEq)]
475pub enum EnvironmentType {
476 CloudFunction,
477 AzureFunction,
478 AzureSpringApp,
479 LambdaFunction,
480}
481
482#[derive(Clone, Debug, Eq, PartialEq)]
483pub struct MiniAgentMetadata {
484 pub azure_spring_app_hostname: Option<String>,
485 pub azure_spring_app_name: Option<String>,
486 pub gcp_project_id: Option<String>,
487 pub gcp_region: Option<String>,
488 pub version: Option<String>,
489}
490
491impl Default for MiniAgentMetadata {
492 fn default() -> Self {
493 MiniAgentMetadata {
494 azure_spring_app_hostname: Default::default(),
495 azure_spring_app_name: Default::default(),
496 gcp_project_id: Default::default(),
497 gcp_region: Default::default(),
498 version: env::var("DD_SERVERLESS_COMPAT_VERSION").ok(),
499 }
500 }
501}
502
503pub fn enrich_span_with_mini_agent_metadata(
504 span: &mut pb::Span,
505 mini_agent_metadata: &MiniAgentMetadata,
506) {
507 if let Some(azure_spring_app_hostname) = &mini_agent_metadata.azure_spring_app_hostname {
508 span.meta.insert(
509 "asa.hostname".to_string(),
510 azure_spring_app_hostname.to_string(),
511 );
512 }
513 if let Some(azure_spring_app_name) = &mini_agent_metadata.azure_spring_app_name {
514 span.meta
515 .insert("asa.name".to_string(), azure_spring_app_name.to_string());
516 }
517 if let Some(serverless_compat_version) = &mini_agent_metadata.version {
518 span.meta.insert(
519 "_dd.serverless_compat_version".to_string(),
520 serverless_compat_version.to_string(),
521 );
522 }
523}
524
525pub fn enrich_span_with_google_cloud_function_metadata(
526 span: &mut pb::Span,
527 mini_agent_metadata: &MiniAgentMetadata,
528 function: Option<String>,
529) {
530 #[allow(clippy::todo)]
531 let Some(region) = &mini_agent_metadata.gcp_region
532 else {
533 todo!()
534 };
535 #[allow(clippy::todo)]
536 let Some(project) = &mini_agent_metadata.gcp_project_id
537 else {
538 todo!()
539 };
540
541 if let Some(function) = function {
542 if !region.is_empty() && !project.is_empty() {
543 let resource_name = format!(
544 "projects/{}/locations/{}/functions/{}",
545 project, region, function
546 );
547
548 span.meta
549 .insert("gcrfx.location".to_string(), region.to_string());
550 span.meta
551 .insert("gcrfx.project_id".to_string(), project.to_string());
552 span.meta
553 .insert("gcrfx.resource_name".to_string(), resource_name.to_string());
554 }
555 }
556}
557
558pub fn enrich_span_with_azure_function_metadata(span: &mut pb::Span) {
559 if span.name == "azure.apim" {
560 return;
561 }
562
563 if let Some(aas_metadata) = &*azure_app_services::AAS_METADATA_FUNCTION {
564 span.meta.extend(
565 aas_metadata
566 .get_function_tags()
567 .map(|(name, value)| (name.to_string(), value.to_string())),
568 );
569 }
570}
571
572macro_rules! parse_root_span_tags {
574 (
575 $root_span_meta_map:ident,
576 { $($tag:literal => $($root_span_tags_struct_field:ident).+ ,)+ }
577 ) => {
578 $(
579 if let Some(root_span_tag_value) = $root_span_meta_map.get($tag) {
580 $($root_span_tags_struct_field).+ = root_span_tag_value;
581 }
582 )+
583 }
584}
585
586pub fn collect_trace_chunks<T: TraceData>(
587 traces: Vec<Vec<crate::span::v04::Span<T>>>,
588 use_v05_format: bool,
589) -> anyhow::Result<TraceChunks<T>> {
590 if use_v05_format {
591 let mut shared_dict = SharedDict::default();
592 let mut v05_traces: Vec<Vec<v05::Span>> = Vec::with_capacity(traces.len());
593 for trace in traces {
594 let trace_len = trace.len();
595 let v05_trace = trace.into_iter().try_fold(
596 Vec::with_capacity(trace_len),
597 |mut acc, span| -> anyhow::Result<Vec<v05::Span>> {
598 acc.push(v05::from_v04_span(span, &mut shared_dict)?);
599 Ok(acc)
600 },
601 )?;
602
603 v05_traces.push(v05_trace);
604 }
605 Ok(TraceChunks::V05((shared_dict, v05_traces)))
606 } else {
607 Ok(TraceChunks::V04(traces))
608 }
609}
610
611pub fn collect_pb_trace_chunks<T: tracer_payload::TraceChunkProcessor>(
612 mut traces: Vec<Vec<pb::Span>>,
613 tracer_header_tags: &TracerHeaderTags,
614 process_chunk: &mut T,
615 is_agentless: bool,
616) -> anyhow::Result<TracerPayloadCollection> {
617 let mut trace_chunks: Vec<pb::TraceChunk> = Vec::new();
618
619 let mut gathered_root_span_tags = !is_agentless;
621 let mut root_span_tags = RootSpanTags::default();
622
623 for trace in traces.iter_mut() {
624 if is_agentless {
625 if let Err(e) = normalizer::normalize_trace(trace) {
626 error!("Error normalizing trace: {e}");
627 }
628 }
629
630 let mut chunk = construct_trace_chunk(trace.to_vec());
631
632 let root_span_index = match get_root_span_index(trace) {
633 Ok(res) => res,
634 Err(e) => {
635 error!("Error getting the root span index of a trace, skipping. {e}");
636 continue;
637 }
638 };
639
640 if let Err(e) = normalizer::normalize_chunk(&mut chunk, root_span_index) {
641 error!("Error normalizing trace chunk: {e}");
642 }
643
644 for span in chunk.spans.iter_mut() {
645 if tracer_header_tags.client_computed_top_level {
647 update_tracer_top_level(span);
648 }
649 }
650
651 if !tracer_header_tags.client_computed_top_level {
652 compute_top_level_span(&mut chunk.spans);
653 }
654
655 process_chunk.process(&mut chunk, root_span_index);
656
657 trace_chunks.push(chunk);
658
659 if !gathered_root_span_tags {
660 gathered_root_span_tags = true;
661 let meta_map = &trace[root_span_index].meta;
662 parse_root_span_tags!(
663 meta_map,
664 {
665 "env" => root_span_tags.env,
666 "version" => root_span_tags.app_version,
667 "_dd.hostname" => root_span_tags.hostname,
668 "runtime-id" => root_span_tags.runtime_id,
669 }
670 );
671 }
672 }
673
674 Ok(TracerPayloadCollection::V07(vec![
675 construct_tracer_payload(trace_chunks, tracer_header_tags, root_span_tags),
676 ]))
677}
678
679pub fn is_measured(span: &pb::Span) -> bool {
681 span.metrics.get(MEASURED_KEY).is_some_and(|v| *v == 1.0)
682}
683
684pub fn is_partial_snapshot(span: &pb::Span) -> bool {
690 span.metrics
691 .get(PARTIAL_VERSION_KEY)
692 .is_some_and(|v| *v >= 0.0)
693}
694
695#[cfg(test)]
696mod tests {
697 use super::*;
698 use crate::{
699 span::SharedDictBytes,
700 test_utils::{create_test_no_alloc_span, create_test_span},
701 };
702 use hyper::Request;
703 use libdd_common::http_common;
704 use libdd_common::Endpoint;
705 use serde_json::json;
706
707 fn find_index_in_dict(dict: &SharedDictBytes, value: &str) -> Option<u32> {
708 let idx = dict.iter().position(|e| e.as_str() == value);
709 idx.map(|idx| idx.try_into().unwrap())
710 }
711
712 #[test]
713 fn test_coalescing_does_not_exceed_max_size() {
714 fn dummy() -> SendData {
715 SendData::new(
716 MAX_PAYLOAD_SIZE / 5 + 1,
717 TracerPayloadCollection::V07(vec![pb::TracerPayload {
718 container_id: "".to_string(),
719 language_name: "".to_string(),
720 language_version: "".to_string(),
721 tracer_version: "".to_string(),
722 runtime_id: "".to_string(),
723 chunks: vec![pb::TraceChunk {
724 priority: 0,
725 origin: "".to_string(),
726 spans: vec![],
727 tags: Default::default(),
728 dropped_trace: false,
729 }],
730 tags: Default::default(),
731 env: "".to_string(),
732 hostname: "".to_string(),
733 app_version: "".to_string(),
734 }]),
735 TracerHeaderTags::default(),
736 &Endpoint::default(),
737 )
738 }
739 let coalesced = coalesce_send_data(vec![dummy(), dummy(), dummy(), dummy(), dummy()]);
740 assert_eq!(
741 5,
742 coalesced
743 .iter()
744 .map(|s| s.tracer_payloads.size())
745 .sum::<usize>()
746 );
747 assert!(
749 coalesced
750 .iter()
751 .map(|s| {
752 if let TracerPayloadCollection::V07(collection) = &s.tracer_payloads {
753 collection.iter().map(|s| s.chunks.len()).max().unwrap()
754 } else {
755 0
756 }
757 })
758 .max()
759 .unwrap()
760 > 1
761 );
762 assert!(coalesced.len() > 1 && coalesced.len() < 5);
763 }
764
765 #[tokio::test]
766 #[allow(clippy::type_complexity)]
767 #[cfg_attr(all(miri, target_os = "macos"), ignore)]
768 async fn test_get_v05_traces_from_request_body() {
769 let data: (
770 Vec<String>,
771 Vec<
772 Vec<(
773 u8,
774 u8,
775 u8,
776 u64,
777 u64,
778 u64,
779 i64,
780 i64,
781 i32,
782 HashMap<u8, u8>,
783 HashMap<u8, f64>,
784 u8,
785 )>,
786 >,
787 ) = (
788 vec![
789 "baggage".to_string(),
790 "item".to_string(),
791 "elasticsearch.version".to_string(),
792 "7.0".to_string(),
793 "my-name".to_string(),
794 "X".to_string(),
795 "my-service".to_string(),
796 "my-resource".to_string(),
797 "_dd.sampling_rate_whatever".to_string(),
798 "value whatever".to_string(),
799 "sql".to_string(),
800 ],
801 vec![vec![(
802 6,
803 4,
804 7,
805 1,
806 2,
807 3,
808 123,
809 456,
810 1,
811 HashMap::from([(8, 9), (0, 1), (2, 3)]),
812 HashMap::from([(5, 1.2)]),
813 10,
814 )]],
815 );
816 let bytes = rmp_serde::to_vec(&data).unwrap();
817 let res = get_v05_traces_from_request_body(http_common::Body::from(bytes)).await;
818 assert!(res.is_ok());
819 let (_, traces) = res.unwrap();
820 let span = traces[0][0].clone();
821 let test_span = pb::Span {
822 service: "my-service".to_string(),
823 name: "my-name".to_string(),
824 resource: "my-resource".to_string(),
825 trace_id: 1,
826 span_id: 2,
827 parent_id: 3,
828 start: 123,
829 duration: 456,
830 error: 1,
831 meta: HashMap::from([
832 ("baggage".to_string(), "item".to_string()),
833 ("elasticsearch.version".to_string(), "7.0".to_string()),
834 (
835 "_dd.sampling_rate_whatever".to_string(),
836 "value whatever".to_string(),
837 ),
838 ]),
839 metrics: HashMap::from([("X".to_string(), 1.2)]),
840 meta_struct: HashMap::default(),
841 r#type: "sql".to_string(),
842 span_links: vec![],
843 span_events: vec![],
844 };
845 assert_eq!(span, test_span);
846 }
847
848 #[tokio::test]
849 #[cfg_attr(miri, ignore)]
850 async fn test_get_traces_from_request_body() {
851 let pairs = vec![
852 (
853 json!([{
854 "service": "test-service",
855 "name": "test-service-name",
856 "resource": "test-service-resource",
857 "trace_id": 111,
858 "span_id": 222,
859 "parent_id": 333,
860 "start": 1,
861 "duration": 5,
862 "error": 0,
863 "meta": {},
864 "metrics": {},
865 }]),
866 vec![vec![pb::Span {
867 service: "test-service".to_string(),
868 name: "test-service-name".to_string(),
869 resource: "test-service-resource".to_string(),
870 trace_id: 111,
871 span_id: 222,
872 parent_id: 333,
873 start: 1,
874 duration: 5,
875 error: 0,
876 meta: HashMap::new(),
877 metrics: HashMap::new(),
878 meta_struct: HashMap::new(),
879 r#type: "".to_string(),
880 span_links: vec![],
881 span_events: vec![],
882 }]],
883 ),
884 (
885 json!([{
886 "name": "test-service-name",
887 "resource": "test-service-resource",
888 "trace_id": 111,
889 "span_id": 222,
890 "start": 1,
891 "duration": 5,
892 "meta": {},
893 }]),
894 vec![vec![pb::Span {
895 service: "".to_string(),
896 name: "test-service-name".to_string(),
897 resource: "test-service-resource".to_string(),
898 trace_id: 111,
899 span_id: 222,
900 parent_id: 0,
901 start: 1,
902 duration: 5,
903 error: 0,
904 meta: HashMap::new(),
905 metrics: HashMap::new(),
906 meta_struct: HashMap::new(),
907 r#type: "".to_string(),
908 span_links: vec![],
909 span_events: vec![],
910 }]],
911 ),
912 ];
913
914 for (trace_input, output) in pairs {
915 let bytes = rmp_serde::to_vec(&vec![&trace_input]).unwrap();
916 let request = Request::builder()
917 .body(http_common::Body::from(bytes))
918 .unwrap();
919 let res = get_traces_from_request_body(request.into_body()).await;
920 assert!(res.is_ok());
921 assert_eq!(res.unwrap().1, output);
922 }
923 }
924
925 #[tokio::test]
926 #[cfg_attr(miri, ignore)]
927 async fn test_get_traces_from_request_body_with_span_links() {
928 let trace_input = json!([[{
929 "service": "test-service",
930 "name": "test-name",
931 "resource": "test-resource",
932 "trace_id": 111,
933 "span_id": 222,
934 "parent_id": 333,
935 "start": 1,
936 "duration": 5,
937 "error": 0,
938 "meta": {},
939 "metrics": {},
940 "span_links": [{
941 "trace_id": 999,
942 "span_id": 888,
943 "trace_id_high": 777,
944 "attributes": {"key": "value"},
945 "tracestate": "vendor=value"
946 }]
948 }]]);
949
950 let expected_output = vec![vec![pb::Span {
951 service: "test-service".to_string(),
952 name: "test-name".to_string(),
953 resource: "test-resource".to_string(),
954 trace_id: 111,
955 span_id: 222,
956 parent_id: 333,
957 start: 1,
958 duration: 5,
959 error: 0,
960 meta: HashMap::new(),
961 metrics: HashMap::new(),
962 meta_struct: HashMap::new(),
963 r#type: String::new(),
964 span_links: vec![pb::SpanLink {
965 trace_id: 999,
966 span_id: 888,
967 trace_id_high: 777,
968 attributes: HashMap::from([("key".to_string(), "value".to_string())]),
969 tracestate: "vendor=value".to_string(),
970 flags: 0, }],
972 span_events: vec![],
973 }]];
974
975 let bytes = rmp_serde::to_vec(&trace_input).unwrap();
976 let request = Request::builder()
977 .body(http_common::Body::from(bytes))
978 .unwrap();
979
980 let res = get_traces_from_request_body(request.into_body()).await;
981 assert!(res.is_ok(), "Failed to deserialize: {res:?}");
982 assert_eq!(res.unwrap().1, expected_output);
983 }
984
985 #[test]
986 fn test_get_root_span_index_from_complete_trace() {
987 let trace = vec![
988 create_test_span(1234, 12341, 0, 1, false),
989 create_test_span(1234, 12342, 12341, 1, false),
990 create_test_span(1234, 12343, 12342, 1, false),
991 ];
992
993 let root_span_index = get_root_span_index(&trace);
994 assert!(root_span_index.is_ok());
995 assert_eq!(root_span_index.unwrap(), 0);
996 }
997
998 #[test]
999 fn test_get_root_span_index_from_partial_trace() {
1000 let trace = vec![
1001 create_test_span(1234, 12342, 12341, 1, false),
1002 create_test_span(1234, 12341, 12340, 1, false), create_test_span(1234, 12343, 12342, 1, false),
1005 ];
1006
1007 let root_span_index = get_root_span_index(&trace);
1008 assert!(root_span_index.is_ok());
1009 assert_eq!(root_span_index.unwrap(), 1);
1010 }
1011
1012 #[test]
1013 fn test_set_serverless_root_span_tags_azure_function() {
1014 let mut span = create_test_span(1234, 12342, 12341, 1, false);
1015 set_serverless_root_span_tags(
1016 &mut span,
1017 Some("test_function".to_string()),
1018 &EnvironmentType::AzureFunction,
1019 );
1020 assert_eq!(
1021 span.meta,
1022 HashMap::from([
1023 (
1024 "runtime-id".to_string(),
1025 "test-runtime-id-value".to_string()
1026 ),
1027 ("_dd.origin".to_string(), "azurefunction".to_string()),
1028 ("origin".to_string(), "azurefunction".to_string()),
1029 ("functionname".to_string(), "test_function".to_string()),
1030 ("env".to_string(), "test-env".to_string()),
1031 ("service".to_string(), "test-service".to_string())
1032 ]),
1033 );
1034 }
1035
1036 #[test]
1037 fn test_set_serverless_root_span_tags_cloud_function() {
1038 let mut span = create_test_span(1234, 12342, 12341, 1, false);
1039 set_serverless_root_span_tags(
1040 &mut span,
1041 Some("test_function".to_string()),
1042 &EnvironmentType::CloudFunction,
1043 );
1044 assert_eq!(
1045 span.meta,
1046 HashMap::from([
1047 (
1048 "runtime-id".to_string(),
1049 "test-runtime-id-value".to_string()
1050 ),
1051 ("_dd.origin".to_string(), "cloudfunction".to_string()),
1052 ("origin".to_string(), "cloudfunction".to_string()),
1053 ("functionname".to_string(), "test_function".to_string()),
1054 ("env".to_string(), "test-env".to_string()),
1055 ("service".to_string(), "test-service".to_string())
1056 ]),
1057 );
1058 }
1059
1060 #[test]
1061 fn test_has_top_level() {
1062 let top_level_span = create_test_span(123, 1234, 12, 1, true);
1063 let not_top_level_span = create_test_span(123, 1234, 12, 1, false);
1064 assert!(has_top_level(&top_level_span));
1065 assert!(!has_top_level(¬_top_level_span));
1066 }
1067
1068 #[test]
1069 fn test_is_measured() {
1070 let mut measured_span = create_test_span(123, 1234, 12, 1, true);
1071 measured_span.metrics.insert(MEASURED_KEY.into(), 1.0);
1072 let not_measured_span = create_test_span(123, 1234, 12, 1, true);
1073 assert!(is_measured(&measured_span));
1074 assert!(!is_measured(¬_measured_span));
1075 }
1076
1077 #[test]
1078 fn test_compute_top_level() {
1079 let mut span_with_different_service = create_test_span(123, 5, 2, 1, false);
1080 span_with_different_service.service = "another_service".into();
1081 let mut trace = vec![
1082 create_test_span(123, 1, 0, 1, false),
1084 create_test_span(123, 2, 1, 1, false),
1086 create_test_span(123, 4, 3, 1, false),
1089 span_with_different_service,
1092 ];
1093
1094 compute_top_level_span(trace.as_mut_slice());
1095
1096 let spans_marked_as_top_level: Vec<u64> = trace
1097 .iter()
1098 .filter_map(|span| {
1099 if has_top_level(span) {
1100 Some(span.span_id)
1101 } else {
1102 None
1103 }
1104 })
1105 .collect();
1106 assert_eq!(spans_marked_as_top_level, [1, 4, 5])
1107 }
1108
1109 #[test]
1110 fn test_collect_trace_chunks_v05() {
1111 let chunk = vec![create_test_no_alloc_span(123, 456, 789, 1, true)];
1112
1113 let collection = collect_trace_chunks(vec![chunk], true).unwrap();
1114
1115 let (dict, traces) = match collection {
1116 TraceChunks::V05(payload) => payload,
1117 _ => panic!("Unexpected type"),
1118 };
1119
1120 assert_eq!(dict.len(), 16);
1121
1122 let span = &traces[0][0];
1123 assert_eq!(span.service, 1);
1124 assert_eq!(span.name, 2);
1125 assert_eq!(span.resource, 3);
1126 assert_eq!(span.trace_id, 123);
1127 assert_eq!(span.span_id, 456);
1128 assert_eq!(span.parent_id, 789);
1129 assert_eq!(span.start, 1);
1130 assert_eq!(span.error, 0);
1131 assert_eq!(span.error, 0);
1132 assert_eq!(span.r#type, 15);
1133 assert_eq!(
1134 *span
1135 .meta
1136 .get(&find_index_in_dict(&dict, "service").unwrap())
1137 .unwrap(),
1138 find_index_in_dict(&dict, "test-service").unwrap()
1139 );
1140 assert_eq!(
1141 *span
1142 .meta
1143 .get(&find_index_in_dict(&dict, "env").unwrap())
1144 .unwrap(),
1145 find_index_in_dict(&dict, "test-env").unwrap()
1146 );
1147 assert_eq!(
1148 *span
1149 .meta
1150 .get(&find_index_in_dict(&dict, "runtime-id").unwrap())
1151 .unwrap(),
1152 find_index_in_dict(&dict, "test-runtime-id-value").unwrap()
1153 );
1154 assert_eq!(
1155 *span
1156 .meta
1157 .get(&find_index_in_dict(&dict, "_dd.origin").unwrap())
1158 .unwrap(),
1159 find_index_in_dict(&dict, "cloudfunction").unwrap()
1160 );
1161 assert_eq!(
1162 *span
1163 .meta
1164 .get(&find_index_in_dict(&dict, "origin").unwrap())
1165 .unwrap(),
1166 find_index_in_dict(&dict, "cloudfunction").unwrap()
1167 );
1168 assert_eq!(
1169 *span
1170 .meta
1171 .get(&find_index_in_dict(&dict, "functionname").unwrap())
1172 .unwrap(),
1173 find_index_in_dict(&dict, "dummy_function_name").unwrap()
1174 );
1175 assert_eq!(
1176 *span
1177 .metrics
1178 .get(&find_index_in_dict(&dict, "_top_level").unwrap())
1179 .unwrap(),
1180 1.0
1181 );
1182 }
1183
1184 #[test]
1185 fn test_rmp_serde_deserialize_meta_with_null_values() {
1186 let span_json = json!({
1188 "service": "test-service",
1189 "name": "test_name",
1190 "resource": "test-resource",
1191 "trace_id": 1_u64,
1192 "span_id": 2_u64,
1193 "parent_id": 0_u64,
1194 "start": 0_i64,
1195 "duration": 5_i64,
1196 "error": 0_i32,
1197 "meta": {
1198 "service": "test-service",
1199 "env": "test-env",
1200 "runtime-id": "test-runtime-id-value",
1201 "problematic_key": null },
1203 "metrics": {},
1204 "type": "",
1205 "meta_struct": {},
1206 "span_links": [],
1207 "span_events": []
1208 });
1209
1210 let traces_json = vec![vec![span_json]];
1211 let encoded_data = rmp_serde::to_vec(&traces_json).unwrap();
1212 let traces: Vec<Vec<pb::Span>> = rmp_serde::from_read(&encoded_data[..])
1213 .expect("Failed to deserialize traces with null values in meta");
1214
1215 assert_eq!(1, traces.len());
1216 assert_eq!(1, traces[0].len());
1217 let decoded_span = &traces[0][0];
1218
1219 assert_eq!("test-service", decoded_span.service);
1220 assert_eq!("test_name", decoded_span.name);
1221 assert_eq!("test-resource", decoded_span.resource);
1222 assert_eq!("test-service", decoded_span.meta.get("service").unwrap());
1223 assert_eq!("test-env", decoded_span.meta.get("env").unwrap());
1224 assert_eq!(
1225 "test-runtime-id-value",
1226 decoded_span.meta.get("runtime-id").unwrap()
1227 );
1228 assert!(
1230 !decoded_span.meta.contains_key("problematic_key"),
1231 "Null value should be skipped, but key was present"
1232 );
1233 }
1234
1235 #[test]
1236 fn test_enrich_span_with_azure_function_metadata_adds_tags_for_non_apim() {
1237 let mut span = create_test_span(1234, 12342, 12341, 1, false);
1238 span.name = "azure.function".to_string();
1239
1240 enrich_span_with_azure_function_metadata(&mut span);
1241
1242 if azure_app_services::AAS_METADATA_FUNCTION.is_some() {
1246 assert!(span.meta.contains_key("aas.resource.id"));
1247 assert!(span.meta.contains_key("aas.environment.instance_id"));
1248 assert!(span.meta.contains_key("aas.environment.instance_name"));
1249 assert!(span.meta.contains_key("aas.subscription.id"));
1250 assert!(span.meta.contains_key("aas.environment.os"));
1251 assert!(span.meta.contains_key("aas.environment.runtime"));
1252 assert!(span.meta.contains_key("aas.environment.runtime_version"));
1253 assert!(span.meta.contains_key("aas.environment.function_runtime"));
1254 assert!(span.meta.contains_key("aas.resource.group"));
1255 assert!(span.meta.contains_key("aas.site.name"));
1256 assert!(span.meta.contains_key("aas.site.kind"));
1257 assert!(span.meta.contains_key("aas.site.type"));
1258 }
1259 }
1260
1261 #[test]
1262 fn test_enrich_span_with_azure_function_metadata_skips_azure_apim() {
1263 let mut span = create_test_span(1234, 12342, 12341, 1, false);
1264 span.name = "azure.apim".to_string();
1265
1266 enrich_span_with_azure_function_metadata(&mut span);
1267
1268 assert!(!span.meta.contains_key("aas.resource.id"));
1270 assert!(!span.meta.contains_key("aas.environment.instance_id"));
1271 assert!(!span.meta.contains_key("aas.environment.instance_name"));
1272 assert!(!span.meta.contains_key("aas.subscription.id"));
1273 assert!(!span.meta.contains_key("aas.environment.os"));
1274 assert!(!span.meta.contains_key("aas.environment.runtime"));
1275 assert!(!span.meta.contains_key("aas.environment.runtime_version"));
1276 assert!(!span.meta.contains_key("aas.environment.function_runtime"));
1277 assert!(!span.meta.contains_key("aas.resource.group"));
1278 assert!(!span.meta.contains_key("aas.site.name"));
1279 assert!(!span.meta.contains_key("aas.site.kind"));
1280 assert!(!span.meta.contains_key("aas.site.type"));
1281 }
1282}