1pub use crate::send_data::send_data_result::SendDataResult;
5pub use crate::send_data::SendData;
6use crate::span::v05::dict::SharedDict;
7use crate::span::{v05, SpanText};
8pub use crate::tracer_header_tags::TracerHeaderTags;
9use crate::tracer_payload::TracerPayloadCollection;
10use crate::tracer_payload::{self, TraceChunks};
11use anyhow::anyhow;
12use bytes::buf::Reader;
13use http_body_util::BodyExt;
14use hyper::body::Buf;
15use libdd_common::{azure_app_services, hyper_migration};
16use libdd_trace_normalization::normalizer;
17use libdd_trace_protobuf::pb;
18use rmp::decode::read_array_len;
19use rmpv::decode::read_value;
20use rmpv::{Integer, Value};
21use std::cmp::Ordering;
22use std::collections::{HashMap, HashSet};
23use std::env;
24use tracing::{debug, error};
25
26pub const MAX_PAYLOAD_SIZE: usize = 25 * 1024 * 1024;
30const TOP_LEVEL_KEY: &str = "_top_level";
32const TRACER_TOP_LEVEL_KEY: &str = "_dd.top_level";
34const MEASURED_KEY: &str = "_dd.measured";
35const PARTIAL_VERSION_KEY: &str = "_dd.partial_version";
36const MAX_STRING_DICT_SIZE: u32 = 25_000_000;
37const SPAN_ELEMENT_COUNT: usize = 12;
38
39pub async fn get_traces_from_request_body(
41 body: hyper_migration::Body,
42) -> anyhow::Result<(usize, Vec<Vec<pb::Span>>)> {
43 let buffer = body.collect().await?.aggregate();
44 let size = buffer.remaining();
45
46 let traces: Vec<Vec<pb::Span>> = match rmp_serde::from_read(buffer.reader()) {
47 Ok(res) => res,
48 Err(err) => {
49 anyhow::bail!("Error deserializing trace from request body: {err}")
50 }
51 };
52
53 Ok((size, traces))
54}
55
56#[inline]
57fn get_v05_strings_dict(reader: &mut Reader<impl Buf>) -> anyhow::Result<Vec<String>> {
58 let dict_size =
59 read_array_len(reader).map_err(|err| anyhow!("Error reading dict size: {err}"))?;
60 if dict_size > MAX_STRING_DICT_SIZE {
61 anyhow::bail!(
62 "Error deserializing strings dictionary. Dict size is too large: {dict_size}"
63 );
64 }
65 let mut dict: Vec<String> = Vec::with_capacity(dict_size.try_into()?);
66 for _ in 0..dict_size {
67 match read_value(reader)? {
68 Value::String(s) => {
69 let parsed_string = s.into_str().ok_or_else(|| anyhow!("Error reading string dict"))?;
70 dict.push(parsed_string);
71 }
72 val => anyhow::bail!("Error deserializing strings dictionary. Value in string dict is not a string: {val}")
73 }
74 }
75 Ok(dict)
76}
77
78#[inline]
79fn get_v05_span(reader: &mut Reader<impl Buf>, dict: &[String]) -> anyhow::Result<pb::Span> {
80 let mut span: pb::Span = Default::default();
81 let span_size = rmp::decode::read_array_len(reader)
82 .map_err(|err| anyhow!("Error reading span size: {err}"))? as usize;
83 if span_size != SPAN_ELEMENT_COUNT {
84 anyhow::bail!("Expected an array of exactly 12 elements in a span, got {span_size}");
85 }
86 span.service = get_v05_string(reader, dict, "service")?;
88 span.name = get_v05_string(reader, dict, "name")?;
90 span.resource = get_v05_string(reader, dict, "resource")?;
92
93 match read_value(reader)? {
95 Value::Integer(i) => {
96 span.trace_id = i.as_u64().ok_or_else(|| {
97 anyhow!("Error reading span trace_id, value is not an integer: {i}")
98 })?;
99 }
100 val => anyhow::bail!("Error reading span trace_id, value is not an integer: {val}"),
101 };
102 match read_value(reader)? {
104 Value::Integer(i) => {
105 span.span_id = i.as_u64().ok_or_else(|| {
106 anyhow!("Error reading span span_id, value is not an integer: {i}")
107 })?;
108 }
109 val => anyhow::bail!("Error reading span span_id, value is not an integer: {val}"),
110 };
111 match read_value(reader)? {
113 Value::Integer(i) => {
114 span.parent_id = i.as_u64().ok_or_else(|| {
115 anyhow!("Error reading span parent_id, value is not an integer: {i}")
116 })?;
117 }
118 val => anyhow::bail!("Error reading span parent_id, value is not an integer: {val}"),
119 };
120 match read_value(reader)? {
122 Value::Integer(i) => {
123 span.start = i
124 .as_i64()
125 .ok_or_else(|| anyhow!("Error reading span start, value is not an integer: {i}"))?;
126 }
127 val => anyhow::bail!("Error reading span start, value is not an integer: {val}"),
128 };
129 match read_value(reader)? {
131 Value::Integer(i) => {
132 span.duration = i.as_i64().ok_or_else(|| {
133 anyhow!("Error reading span duration, value is not an integer: {i}")
134 })?;
135 }
136 val => anyhow::bail!("Error reading span duration, value is not an integer: {val}"),
137 };
138 match read_value(reader)? {
140 Value::Integer(i) => {
141 span.error = i
142 .as_i64()
143 .ok_or_else(|| anyhow!("Error reading span error, value is not an integer: {i}"))?
144 as i32;
145 }
146 val => anyhow::bail!("Error reading span error, value is not an integer: {val}"),
147 }
148 match read_value(reader)? {
150 Value::Map(meta) => {
151 for (k, v) in meta.iter() {
152 match k {
153 Value::Integer(k) => {
154 match v {
155 Value::Integer(v) => {
156 let key = str_from_dict(dict, *k)?;
157 let val = str_from_dict(dict, *v)?;
158 span.meta.insert(key, val);
159 }
160 _ => anyhow::bail!("Error reading span meta, value is not an integer and can't be looked up in dict: {v}")
161 }
162 }
163 _ => anyhow::bail!("Error reading span meta, key is not an integer and can't be looked up in dict: {k}")
164 }
165 }
166 }
167 val => anyhow::bail!("Error reading span meta, value is not a map: {val}"),
168 }
169 match read_value(reader)? {
171 Value::Map(metrics) => {
172 for (k, v) in metrics.iter() {
173 match k {
174 Value::Integer(k) => {
175 match v {
176 Value::Integer(v) => {
177 let key = str_from_dict(dict, *k)?;
178 span.metrics.insert(key, v.as_f64().ok_or_else(||anyhow!("Error reading span metrics, value is not an integer: {v}"))?);
179 }
180 Value::F64(v) => {
181 let key = str_from_dict(dict, *k)?;
182 span.metrics.insert(key, *v);
183 }
184 _ => anyhow::bail!(
185 "Error reading span metrics, value is not a float or integer: {v}"
186 ),
187 }
188 }
189 _ => anyhow::bail!("Error reading span metrics, key is not an integer: {k}"),
190 }
191 }
192 }
193 val => anyhow::bail!("Error reading span metrics, value is not a map: {val}"),
194 }
195
196 match read_value(reader)? {
198 Value::Integer(s) => span.r#type = str_from_dict(dict, s)?,
199 val => anyhow::bail!("Error reading span type, value is not an integer: {val}"),
200 }
201 Ok(span)
202}
203
204#[inline]
205fn str_from_dict(dict: &[String], id: Integer) -> anyhow::Result<String> {
206 let id = id
207 .as_i64()
208 .ok_or_else(|| anyhow!("Error reading string from dict, id is not an integer: {id}"))?
209 as usize;
210 if id >= dict.len() {
211 anyhow::bail!("Error reading string from dict, id out of bounds: {id}");
212 }
213 Ok(dict[id].to_string())
214}
215
216#[inline]
217fn get_v05_string(
218 reader: &mut Reader<impl Buf>,
219 dict: &[String],
220 field_name: &str,
221) -> anyhow::Result<String> {
222 match read_value(reader)? {
223 Value::Integer(s) => {
224 str_from_dict(dict, s)
225 },
226 val => anyhow::bail!("Error reading {field_name}, value is not an integer and can't be looked up in dict: {val}")
227 }
228}
229
230pub async fn get_v05_traces_from_request_body(
231 body: hyper_migration::Body,
232) -> anyhow::Result<(usize, Vec<Vec<pb::Span>>)> {
233 let buffer = body.collect().await?.aggregate();
234 let body_size = buffer.remaining();
235 let mut reader = buffer.reader();
236 let wrapper_size = read_array_len(&mut reader)?;
237 if wrapper_size != 2 {
238 anyhow::bail!("Expected an arrary of exactly 2 elements, got {wrapper_size}");
239 }
240
241 let dict = get_v05_strings_dict(&mut reader)?;
242
243 let traces_size = rmp::decode::read_array_len(&mut reader)?;
244 let mut traces: Vec<Vec<pb::Span>> = Default::default();
245
246 for _ in 0..traces_size {
247 let spans_size = rmp::decode::read_array_len(&mut reader)?;
248 let mut trace: Vec<pb::Span> = Default::default();
249
250 for _ in 0..spans_size {
251 let span = get_v05_span(&mut reader, &dict)?;
252 trace.push(span);
253 }
254 traces.push(trace);
255 }
256 Ok((body_size, traces))
257}
258
259#[derive(Default)]
261pub struct RootSpanTags<'a> {
262 pub env: &'a str,
263 pub app_version: &'a str,
264 pub hostname: &'a str,
265 pub runtime_id: &'a str,
266}
267
268pub(crate) fn construct_trace_chunk(trace: Vec<pb::Span>) -> pb::TraceChunk {
269 pb::TraceChunk {
270 priority: normalizer::SamplerPriority::None as i32,
271 origin: "".to_string(),
272 spans: trace,
273 tags: HashMap::new(),
274 dropped_trace: false,
275 }
276}
277
278pub(crate) fn construct_tracer_payload(
279 chunks: Vec<pb::TraceChunk>,
280 tracer_tags: &TracerHeaderTags,
281 root_span_tags: RootSpanTags,
282) -> pb::TracerPayload {
283 pb::TracerPayload {
284 app_version: root_span_tags.app_version.to_string(),
285 language_name: tracer_tags.lang.to_string(),
286 container_id: tracer_tags.container_id.to_string(),
287 env: root_span_tags.env.to_string(),
288 runtime_id: root_span_tags.runtime_id.to_string(),
289 chunks,
290 hostname: root_span_tags.hostname.to_string(),
291 language_version: tracer_tags.lang_version.to_string(),
292 tags: HashMap::new(),
293 tracer_version: tracer_tags.tracer_version.to_string(),
294 }
295}
296
297pub(crate) fn cmp_send_data_payloads(a: &pb::TracerPayload, b: &pb::TracerPayload) -> Ordering {
298 a.tracer_version
299 .cmp(&b.tracer_version)
300 .then(a.language_version.cmp(&b.language_version))
301 .then(a.language_name.cmp(&b.language_name))
302 .then(a.hostname.cmp(&b.hostname))
303 .then(a.container_id.cmp(&b.container_id))
304 .then(a.runtime_id.cmp(&b.runtime_id))
305 .then(a.env.cmp(&b.env))
306 .then(a.app_version.cmp(&b.app_version))
307}
308
309pub fn coalesce_send_data(mut data: Vec<SendData>) -> Vec<SendData> {
310 data.sort_unstable_by(|a, b| {
313 a.get_target()
314 .url
315 .to_string()
316 .cmp(&b.get_target().url.to_string())
317 .then(a.get_target().test_token.cmp(&b.get_target().test_token))
318 });
319 data.dedup_by(|a, b| {
320 if a.get_target().url == b.get_target().url
321 && a.get_target().test_token == b.get_target().test_token
322 {
323 if a.size + b.size < MAX_PAYLOAD_SIZE / 2 {
328 b.tracer_payloads.append(&mut a.tracer_payloads);
330 b.size += a.size;
331 return true;
332 }
333 }
334 false
335 });
336 for send_data in data.iter_mut() {
339 send_data.tracer_payloads.merge();
340 }
341 data
342}
343
344pub fn get_root_span_index(trace: &[pb::Span]) -> anyhow::Result<usize> {
345 if trace.is_empty() {
346 anyhow::bail!("Cannot find root span index in an empty trace.");
347 }
348
349 for (i, span) in trace.iter().enumerate().rev() {
352 if span.parent_id == 0 {
353 return Ok(i);
354 }
355 }
356
357 let mut span_ids: HashSet<u64> = HashSet::with_capacity(trace.len());
358 for span in trace.iter() {
359 span_ids.insert(span.span_id);
360 }
361
362 let mut root_span_id = None;
363 for (i, span) in trace.iter().enumerate() {
364 if !span_ids.contains(&span.parent_id) {
366 if root_span_id.is_some() {
367 debug!(
368 trace_id = &trace[0].trace_id,
369 "trace has multiple root spans"
370 );
371 }
372 root_span_id = Some(i);
373 }
374 }
375 Ok(match root_span_id {
376 Some(i) => i,
377 None => {
378 debug!(
379 trace_id = &trace[0].trace_id,
380 "Could not find the root span for trace"
381 );
382 trace.len() - 1
383 }
384 })
385}
386
387pub fn compute_top_level_span(trace: &mut [pb::Span]) {
394 let mut span_id_to_service: HashMap<u64, String> = HashMap::new();
395 for span in trace.iter() {
396 span_id_to_service.insert(span.span_id, span.service.clone());
397 }
398 for span in trace.iter_mut() {
399 if span.parent_id == 0 {
400 set_top_level_span(span, true);
401 continue;
402 }
403 match span_id_to_service.get(&span.parent_id) {
404 Some(parent_span_service) => {
405 if !parent_span_service.eq(&span.service) {
406 set_top_level_span(span, true)
408 }
409 }
410 None => {
411 set_top_level_span(span, true)
413 }
414 }
415 }
416}
417
418pub fn has_top_level(span: &pb::Span) -> bool {
420 span.metrics
421 .get(TRACER_TOP_LEVEL_KEY)
422 .is_some_and(|v| *v == 1.0)
423 || span.metrics.get(TOP_LEVEL_KEY).is_some_and(|v| *v == 1.0)
424}
425
426fn set_top_level_span(span: &mut pb::Span, is_top_level: bool) {
427 if is_top_level {
428 span.metrics.insert(TOP_LEVEL_KEY.to_string(), 1.0);
429 } else {
430 span.metrics.remove(TOP_LEVEL_KEY);
431 }
432}
433
434pub fn set_serverless_root_span_tags(
435 span: &mut pb::Span,
436 app_name: Option<String>,
437 env_type: &EnvironmentType,
438) {
439 let origin_tag = match env_type {
440 EnvironmentType::CloudFunction => "cloudfunction",
441 EnvironmentType::AzureFunction => "azurefunction",
442 EnvironmentType::AzureSpringApp => "azurespringapp",
443 EnvironmentType::LambdaFunction => "lambda", };
445 span.meta
446 .insert("_dd.origin".to_string(), origin_tag.to_string());
447 span.meta
448 .insert("origin".to_string(), origin_tag.to_string());
449
450 if let Some(function_name) = app_name {
451 match env_type {
452 EnvironmentType::CloudFunction
453 | EnvironmentType::AzureFunction
454 | EnvironmentType::LambdaFunction => {
455 span.meta.insert("functionname".to_string(), function_name);
456 }
457 _ => {}
458 }
459 }
460}
461
462fn update_tracer_top_level(span: &mut pb::Span) {
463 if span.metrics.contains_key(TRACER_TOP_LEVEL_KEY) {
464 span.metrics.insert(TOP_LEVEL_KEY.to_string(), 1.0);
465 }
466}
467
468#[derive(Clone, Debug, Eq, PartialEq)]
469pub enum EnvironmentType {
470 CloudFunction,
471 AzureFunction,
472 AzureSpringApp,
473 LambdaFunction,
474}
475
476#[derive(Clone, Debug, Eq, PartialEq)]
477pub struct MiniAgentMetadata {
478 pub azure_spring_app_hostname: Option<String>,
479 pub azure_spring_app_name: Option<String>,
480 pub gcp_project_id: Option<String>,
481 pub gcp_region: Option<String>,
482 pub version: Option<String>,
483}
484
485impl Default for MiniAgentMetadata {
486 fn default() -> Self {
487 MiniAgentMetadata {
488 azure_spring_app_hostname: Default::default(),
489 azure_spring_app_name: Default::default(),
490 gcp_project_id: Default::default(),
491 gcp_region: Default::default(),
492 version: env::var("DD_SERVERLESS_COMPAT_VERSION").ok(),
493 }
494 }
495}
496
497pub fn enrich_span_with_mini_agent_metadata(
498 span: &mut pb::Span,
499 mini_agent_metadata: &MiniAgentMetadata,
500) {
501 if let Some(azure_spring_app_hostname) = &mini_agent_metadata.azure_spring_app_hostname {
502 span.meta.insert(
503 "asa.hostname".to_string(),
504 azure_spring_app_hostname.to_string(),
505 );
506 }
507 if let Some(azure_spring_app_name) = &mini_agent_metadata.azure_spring_app_name {
508 span.meta
509 .insert("asa.name".to_string(), azure_spring_app_name.to_string());
510 }
511 if let Some(serverless_compat_version) = &mini_agent_metadata.version {
512 span.meta.insert(
513 "_dd.serverless_compat_version".to_string(),
514 serverless_compat_version.to_string(),
515 );
516 }
517}
518
519pub fn enrich_span_with_google_cloud_function_metadata(
520 span: &mut pb::Span,
521 mini_agent_metadata: &MiniAgentMetadata,
522 function: Option<String>,
523) {
524 #[allow(clippy::todo)]
525 let Some(region) = &mini_agent_metadata.gcp_region
526 else {
527 todo!()
528 };
529 #[allow(clippy::todo)]
530 let Some(project) = &mini_agent_metadata.gcp_project_id
531 else {
532 todo!()
533 };
534
535 #[allow(clippy::unwrap_used)]
536 if function.is_some() && !region.is_empty() && !project.is_empty() {
537 let resource_name = format!(
538 "projects/{}/locations/{}/functions/{}",
539 project,
540 region,
541 function.unwrap()
542 );
543
544 span.meta
545 .insert("gcrfx.location".to_string(), region.to_string());
546 span.meta
547 .insert("gcrfx.project_id".to_string(), project.to_string());
548 span.meta
549 .insert("gcrfx.resource_name".to_string(), resource_name.to_string());
550 }
551}
552
553pub fn enrich_span_with_azure_function_metadata(span: &mut pb::Span) {
554 if let Some(aas_metadata) = &*azure_app_services::AAS_METADATA_FUNCTION {
555 let aas_tags = [
556 ("aas.resource.id", aas_metadata.get_resource_id()),
557 (
558 "aas.environment.instance_id",
559 aas_metadata.get_instance_id(),
560 ),
561 (
562 "aas.environment.instance_name",
563 aas_metadata.get_instance_name(),
564 ),
565 ("aas.subscription.id", aas_metadata.get_subscription_id()),
566 ("aas.environment.os", aas_metadata.get_operating_system()),
567 ("aas.environment.runtime", aas_metadata.get_runtime()),
568 (
569 "aas.environment.runtime_version",
570 aas_metadata.get_runtime_version(),
571 ),
572 (
573 "aas.environment.function_runtime",
574 aas_metadata.get_function_runtime_version(),
575 ),
576 ("aas.resource.group", aas_metadata.get_resource_group()),
577 ("aas.site.name", aas_metadata.get_site_name()),
578 ("aas.site.kind", aas_metadata.get_site_kind()),
579 ("aas.site.type", aas_metadata.get_site_type()),
580 ];
581 aas_tags.into_iter().for_each(|(name, value)| {
582 span.meta.insert(name.to_string(), value.to_string());
583 });
584 }
585}
586
587macro_rules! parse_root_span_tags {
589 (
590 $root_span_meta_map:ident,
591 { $($tag:literal => $($root_span_tags_struct_field:ident).+ ,)+ }
592 ) => {
593 $(
594 if let Some(root_span_tag_value) = $root_span_meta_map.get($tag) {
595 $($root_span_tags_struct_field).+ = root_span_tag_value;
596 }
597 )+
598 }
599}
600
601pub fn collect_trace_chunks<T: SpanText>(
602 traces: Vec<Vec<crate::span::Span<T>>>,
603 use_v05_format: bool,
604) -> anyhow::Result<TraceChunks<T>> {
605 if use_v05_format {
606 let mut shared_dict = SharedDict::default();
607 let mut v05_traces: Vec<Vec<v05::Span>> = Vec::with_capacity(traces.len());
608 for trace in traces {
609 let trace_len = trace.len();
610 let v05_trace = trace.into_iter().try_fold(
611 Vec::with_capacity(trace_len),
612 |mut acc, span| -> anyhow::Result<Vec<v05::Span>> {
613 acc.push(v05::from_span(span, &mut shared_dict)?);
614 Ok(acc)
615 },
616 )?;
617
618 v05_traces.push(v05_trace);
619 }
620 Ok(TraceChunks::V05((shared_dict, v05_traces)))
621 } else {
622 Ok(TraceChunks::V04(traces))
623 }
624}
625
626pub fn collect_pb_trace_chunks<T: tracer_payload::TraceChunkProcessor>(
627 mut traces: Vec<Vec<pb::Span>>,
628 tracer_header_tags: &TracerHeaderTags,
629 process_chunk: &mut T,
630 is_agentless: bool,
631) -> anyhow::Result<TracerPayloadCollection> {
632 let mut trace_chunks: Vec<pb::TraceChunk> = Vec::new();
633
634 let mut gathered_root_span_tags = !is_agentless;
636 let mut root_span_tags = RootSpanTags::default();
637
638 for trace in traces.iter_mut() {
639 if is_agentless {
640 if let Err(e) = normalizer::normalize_trace(trace) {
641 error!("Error normalizing trace: {e}");
642 }
643 }
644
645 let mut chunk = construct_trace_chunk(trace.to_vec());
646
647 let root_span_index = match get_root_span_index(trace) {
648 Ok(res) => res,
649 Err(e) => {
650 error!("Error getting the root span index of a trace, skipping. {e}");
651 continue;
652 }
653 };
654
655 if let Err(e) = normalizer::normalize_chunk(&mut chunk, root_span_index) {
656 error!("Error normalizing trace chunk: {e}");
657 }
658
659 for span in chunk.spans.iter_mut() {
660 if tracer_header_tags.client_computed_top_level {
662 update_tracer_top_level(span);
663 }
664 }
665
666 if !tracer_header_tags.client_computed_top_level {
667 compute_top_level_span(&mut chunk.spans);
668 }
669
670 process_chunk.process(&mut chunk, root_span_index);
671
672 trace_chunks.push(chunk);
673
674 if !gathered_root_span_tags {
675 gathered_root_span_tags = true;
676 let meta_map = &trace[root_span_index].meta;
677 parse_root_span_tags!(
678 meta_map,
679 {
680 "env" => root_span_tags.env,
681 "version" => root_span_tags.app_version,
682 "_dd.hostname" => root_span_tags.hostname,
683 "runtime-id" => root_span_tags.runtime_id,
684 }
685 );
686 }
687 }
688
689 Ok(TracerPayloadCollection::V07(vec![
690 construct_tracer_payload(trace_chunks, tracer_header_tags, root_span_tags),
691 ]))
692}
693
694pub fn is_measured(span: &pb::Span) -> bool {
696 span.metrics.get(MEASURED_KEY).is_some_and(|v| *v == 1.0)
697}
698
699pub fn is_partial_snapshot(span: &pb::Span) -> bool {
705 span.metrics
706 .get(PARTIAL_VERSION_KEY)
707 .is_some_and(|v| *v >= 0.0)
708}
709
710#[cfg(test)]
711mod tests {
712 use super::*;
713 use crate::{
714 span::SharedDictBytes,
715 test_utils::{create_test_no_alloc_span, create_test_span},
716 };
717 use hyper::Request;
718 use libdd_common::Endpoint;
719 use serde_json::json;
720
721 fn find_index_in_dict(dict: &SharedDictBytes, value: &str) -> Option<u32> {
722 let idx = dict.iter().position(|e| e.as_str() == value);
723 idx.map(|idx| idx.try_into().unwrap())
724 }
725
726 #[test]
727 fn test_coalescing_does_not_exceed_max_size() {
728 let dummy = SendData::new(
729 MAX_PAYLOAD_SIZE / 5 + 1,
730 TracerPayloadCollection::V07(vec![pb::TracerPayload {
731 container_id: "".to_string(),
732 language_name: "".to_string(),
733 language_version: "".to_string(),
734 tracer_version: "".to_string(),
735 runtime_id: "".to_string(),
736 chunks: vec![pb::TraceChunk {
737 priority: 0,
738 origin: "".to_string(),
739 spans: vec![],
740 tags: Default::default(),
741 dropped_trace: false,
742 }],
743 tags: Default::default(),
744 env: "".to_string(),
745 hostname: "".to_string(),
746 app_version: "".to_string(),
747 }]),
748 TracerHeaderTags::default(),
749 &Endpoint::default(),
750 );
751 let coalesced = coalesce_send_data(vec![
752 dummy.clone(),
753 dummy.clone(),
754 dummy.clone(),
755 dummy.clone(),
756 dummy.clone(),
757 ]);
758 assert_eq!(
759 5,
760 coalesced
761 .iter()
762 .map(|s| s.tracer_payloads.size())
763 .sum::<usize>()
764 );
765 assert!(
767 coalesced
768 .iter()
769 .map(|s| {
770 if let TracerPayloadCollection::V07(collection) = &s.tracer_payloads {
771 collection.iter().map(|s| s.chunks.len()).max().unwrap()
772 } else {
773 0
774 }
775 })
776 .max()
777 .unwrap()
778 > 1
779 );
780 assert!(coalesced.len() > 1 && coalesced.len() < 5);
781 }
782
783 #[tokio::test]
784 #[allow(clippy::type_complexity)]
785 #[cfg_attr(all(miri, target_os = "macos"), ignore)]
786 async fn test_get_v05_traces_from_request_body() {
787 let data: (
788 Vec<String>,
789 Vec<
790 Vec<(
791 u8,
792 u8,
793 u8,
794 u64,
795 u64,
796 u64,
797 i64,
798 i64,
799 i32,
800 HashMap<u8, u8>,
801 HashMap<u8, f64>,
802 u8,
803 )>,
804 >,
805 ) = (
806 vec![
807 "baggage".to_string(),
808 "item".to_string(),
809 "elasticsearch.version".to_string(),
810 "7.0".to_string(),
811 "my-name".to_string(),
812 "X".to_string(),
813 "my-service".to_string(),
814 "my-resource".to_string(),
815 "_dd.sampling_rate_whatever".to_string(),
816 "value whatever".to_string(),
817 "sql".to_string(),
818 ],
819 vec![vec![(
820 6,
821 4,
822 7,
823 1,
824 2,
825 3,
826 123,
827 456,
828 1,
829 HashMap::from([(8, 9), (0, 1), (2, 3)]),
830 HashMap::from([(5, 1.2)]),
831 10,
832 )]],
833 );
834 let bytes = rmp_serde::to_vec(&data).unwrap();
835 let res = get_v05_traces_from_request_body(hyper_migration::Body::from(bytes)).await;
836 assert!(res.is_ok());
837 let (_, traces) = res.unwrap();
838 let span = traces[0][0].clone();
839 let test_span = pb::Span {
840 service: "my-service".to_string(),
841 name: "my-name".to_string(),
842 resource: "my-resource".to_string(),
843 trace_id: 1,
844 span_id: 2,
845 parent_id: 3,
846 start: 123,
847 duration: 456,
848 error: 1,
849 meta: HashMap::from([
850 ("baggage".to_string(), "item".to_string()),
851 ("elasticsearch.version".to_string(), "7.0".to_string()),
852 (
853 "_dd.sampling_rate_whatever".to_string(),
854 "value whatever".to_string(),
855 ),
856 ]),
857 metrics: HashMap::from([("X".to_string(), 1.2)]),
858 meta_struct: HashMap::default(),
859 r#type: "sql".to_string(),
860 span_links: vec![],
861 span_events: vec![],
862 };
863 assert_eq!(span, test_span);
864 }
865
866 #[tokio::test]
867 #[cfg_attr(miri, ignore)]
868 async fn test_get_traces_from_request_body() {
869 let pairs = vec![
870 (
871 json!([{
872 "service": "test-service",
873 "name": "test-service-name",
874 "resource": "test-service-resource",
875 "trace_id": 111,
876 "span_id": 222,
877 "parent_id": 333,
878 "start": 1,
879 "duration": 5,
880 "error": 0,
881 "meta": {},
882 "metrics": {},
883 }]),
884 vec![vec![pb::Span {
885 service: "test-service".to_string(),
886 name: "test-service-name".to_string(),
887 resource: "test-service-resource".to_string(),
888 trace_id: 111,
889 span_id: 222,
890 parent_id: 333,
891 start: 1,
892 duration: 5,
893 error: 0,
894 meta: HashMap::new(),
895 metrics: HashMap::new(),
896 meta_struct: HashMap::new(),
897 r#type: "".to_string(),
898 span_links: vec![],
899 span_events: vec![],
900 }]],
901 ),
902 (
903 json!([{
904 "name": "test-service-name",
905 "resource": "test-service-resource",
906 "trace_id": 111,
907 "span_id": 222,
908 "start": 1,
909 "duration": 5,
910 "meta": {},
911 }]),
912 vec![vec![pb::Span {
913 service: "".to_string(),
914 name: "test-service-name".to_string(),
915 resource: "test-service-resource".to_string(),
916 trace_id: 111,
917 span_id: 222,
918 parent_id: 0,
919 start: 1,
920 duration: 5,
921 error: 0,
922 meta: HashMap::new(),
923 metrics: HashMap::new(),
924 meta_struct: HashMap::new(),
925 r#type: "".to_string(),
926 span_links: vec![],
927 span_events: vec![],
928 }]],
929 ),
930 ];
931
932 for (trace_input, output) in pairs {
933 let bytes = rmp_serde::to_vec(&vec![&trace_input]).unwrap();
934 let request = Request::builder()
935 .body(hyper_migration::Body::from(bytes))
936 .unwrap();
937 let res = get_traces_from_request_body(request.into_body()).await;
938 assert!(res.is_ok());
939 assert_eq!(res.unwrap().1, output);
940 }
941 }
942
943 #[tokio::test]
944 #[cfg_attr(miri, ignore)]
945 async fn test_get_traces_from_request_body_with_span_links() {
946 let trace_input = json!([[{
947 "service": "test-service",
948 "name": "test-name",
949 "resource": "test-resource",
950 "trace_id": 111,
951 "span_id": 222,
952 "parent_id": 333,
953 "start": 1,
954 "duration": 5,
955 "error": 0,
956 "meta": {},
957 "metrics": {},
958 "span_links": [{
959 "trace_id": 999,
960 "span_id": 888,
961 "trace_id_high": 777,
962 "attributes": {"key": "value"},
963 "tracestate": "vendor=value"
964 }]
966 }]]);
967
968 let expected_output = vec![vec![pb::Span {
969 service: "test-service".to_string(),
970 name: "test-name".to_string(),
971 resource: "test-resource".to_string(),
972 trace_id: 111,
973 span_id: 222,
974 parent_id: 333,
975 start: 1,
976 duration: 5,
977 error: 0,
978 meta: HashMap::new(),
979 metrics: HashMap::new(),
980 meta_struct: HashMap::new(),
981 r#type: String::new(),
982 span_links: vec![pb::SpanLink {
983 trace_id: 999,
984 span_id: 888,
985 trace_id_high: 777,
986 attributes: HashMap::from([("key".to_string(), "value".to_string())]),
987 tracestate: "vendor=value".to_string(),
988 flags: 0, }],
990 span_events: vec![],
991 }]];
992
993 let bytes = rmp_serde::to_vec(&trace_input).unwrap();
994 let request = Request::builder()
995 .body(hyper_migration::Body::from(bytes))
996 .unwrap();
997
998 let res = get_traces_from_request_body(request.into_body()).await;
999 assert!(res.is_ok(), "Failed to deserialize: {res:?}");
1000 assert_eq!(res.unwrap().1, expected_output);
1001 }
1002
1003 #[test]
1004 fn test_get_root_span_index_from_complete_trace() {
1005 let trace = vec![
1006 create_test_span(1234, 12341, 0, 1, false),
1007 create_test_span(1234, 12342, 12341, 1, false),
1008 create_test_span(1234, 12343, 12342, 1, false),
1009 ];
1010
1011 let root_span_index = get_root_span_index(&trace);
1012 assert!(root_span_index.is_ok());
1013 assert_eq!(root_span_index.unwrap(), 0);
1014 }
1015
1016 #[test]
1017 fn test_get_root_span_index_from_partial_trace() {
1018 let trace = vec![
1019 create_test_span(1234, 12342, 12341, 1, false),
1020 create_test_span(1234, 12341, 12340, 1, false), create_test_span(1234, 12343, 12342, 1, false),
1023 ];
1024
1025 let root_span_index = get_root_span_index(&trace);
1026 assert!(root_span_index.is_ok());
1027 assert_eq!(root_span_index.unwrap(), 1);
1028 }
1029
1030 #[test]
1031 fn test_set_serverless_root_span_tags_azure_function() {
1032 let mut span = create_test_span(1234, 12342, 12341, 1, false);
1033 set_serverless_root_span_tags(
1034 &mut span,
1035 Some("test_function".to_string()),
1036 &EnvironmentType::AzureFunction,
1037 );
1038 assert_eq!(
1039 span.meta,
1040 HashMap::from([
1041 (
1042 "runtime-id".to_string(),
1043 "test-runtime-id-value".to_string()
1044 ),
1045 ("_dd.origin".to_string(), "azurefunction".to_string()),
1046 ("origin".to_string(), "azurefunction".to_string()),
1047 ("functionname".to_string(), "test_function".to_string()),
1048 ("env".to_string(), "test-env".to_string()),
1049 ("service".to_string(), "test-service".to_string())
1050 ]),
1051 );
1052 }
1053
1054 #[test]
1055 fn test_set_serverless_root_span_tags_cloud_function() {
1056 let mut span = create_test_span(1234, 12342, 12341, 1, false);
1057 set_serverless_root_span_tags(
1058 &mut span,
1059 Some("test_function".to_string()),
1060 &EnvironmentType::CloudFunction,
1061 );
1062 assert_eq!(
1063 span.meta,
1064 HashMap::from([
1065 (
1066 "runtime-id".to_string(),
1067 "test-runtime-id-value".to_string()
1068 ),
1069 ("_dd.origin".to_string(), "cloudfunction".to_string()),
1070 ("origin".to_string(), "cloudfunction".to_string()),
1071 ("functionname".to_string(), "test_function".to_string()),
1072 ("env".to_string(), "test-env".to_string()),
1073 ("service".to_string(), "test-service".to_string())
1074 ]),
1075 );
1076 }
1077
1078 #[test]
1079 fn test_has_top_level() {
1080 let top_level_span = create_test_span(123, 1234, 12, 1, true);
1081 let not_top_level_span = create_test_span(123, 1234, 12, 1, false);
1082 assert!(has_top_level(&top_level_span));
1083 assert!(!has_top_level(¬_top_level_span));
1084 }
1085
1086 #[test]
1087 fn test_is_measured() {
1088 let mut measured_span = create_test_span(123, 1234, 12, 1, true);
1089 measured_span.metrics.insert(MEASURED_KEY.into(), 1.0);
1090 let not_measured_span = create_test_span(123, 1234, 12, 1, true);
1091 assert!(is_measured(&measured_span));
1092 assert!(!is_measured(¬_measured_span));
1093 }
1094
1095 #[test]
1096 fn test_compute_top_level() {
1097 let mut span_with_different_service = create_test_span(123, 5, 2, 1, false);
1098 span_with_different_service.service = "another_service".into();
1099 let mut trace = vec![
1100 create_test_span(123, 1, 0, 1, false),
1102 create_test_span(123, 2, 1, 1, false),
1104 create_test_span(123, 4, 3, 1, false),
1107 span_with_different_service,
1110 ];
1111
1112 compute_top_level_span(trace.as_mut_slice());
1113
1114 let spans_marked_as_top_level: Vec<u64> = trace
1115 .iter()
1116 .filter_map(|span| {
1117 if has_top_level(span) {
1118 Some(span.span_id)
1119 } else {
1120 None
1121 }
1122 })
1123 .collect();
1124 assert_eq!(spans_marked_as_top_level, [1, 4, 5])
1125 }
1126
1127 #[test]
1128 fn test_collect_trace_chunks_v05() {
1129 let chunk = vec![create_test_no_alloc_span(123, 456, 789, 1, true)];
1130
1131 let collection = collect_trace_chunks(vec![chunk], true).unwrap();
1132
1133 let (dict, traces) = match collection {
1134 TraceChunks::V05(payload) => payload,
1135 _ => panic!("Unexpected type"),
1136 };
1137
1138 assert_eq!(dict.len(), 16);
1139
1140 let span = &traces[0][0];
1141 assert_eq!(span.service, 1);
1142 assert_eq!(span.name, 2);
1143 assert_eq!(span.resource, 3);
1144 assert_eq!(span.trace_id, 123);
1145 assert_eq!(span.span_id, 456);
1146 assert_eq!(span.parent_id, 789);
1147 assert_eq!(span.start, 1);
1148 assert_eq!(span.error, 0);
1149 assert_eq!(span.error, 0);
1150 assert_eq!(span.r#type, 15);
1151 assert_eq!(
1152 *span
1153 .meta
1154 .get(&find_index_in_dict(&dict, "service").unwrap())
1155 .unwrap(),
1156 find_index_in_dict(&dict, "test-service").unwrap()
1157 );
1158 assert_eq!(
1159 *span
1160 .meta
1161 .get(&find_index_in_dict(&dict, "env").unwrap())
1162 .unwrap(),
1163 find_index_in_dict(&dict, "test-env").unwrap()
1164 );
1165 assert_eq!(
1166 *span
1167 .meta
1168 .get(&find_index_in_dict(&dict, "runtime-id").unwrap())
1169 .unwrap(),
1170 find_index_in_dict(&dict, "test-runtime-id-value").unwrap()
1171 );
1172 assert_eq!(
1173 *span
1174 .meta
1175 .get(&find_index_in_dict(&dict, "_dd.origin").unwrap())
1176 .unwrap(),
1177 find_index_in_dict(&dict, "cloudfunction").unwrap()
1178 );
1179 assert_eq!(
1180 *span
1181 .meta
1182 .get(&find_index_in_dict(&dict, "origin").unwrap())
1183 .unwrap(),
1184 find_index_in_dict(&dict, "cloudfunction").unwrap()
1185 );
1186 assert_eq!(
1187 *span
1188 .meta
1189 .get(&find_index_in_dict(&dict, "functionname").unwrap())
1190 .unwrap(),
1191 find_index_in_dict(&dict, "dummy_function_name").unwrap()
1192 );
1193 assert_eq!(
1194 *span
1195 .metrics
1196 .get(&find_index_in_dict(&dict, "_top_level").unwrap())
1197 .unwrap(),
1198 1.0
1199 );
1200 }
1201}