Skip to main content

libdd_trace_utils/
tracer_payload.rs

1// Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/
2// SPDX-License-Identifier: Apache-2.0
3
4use crate::span::v05::dict::SharedDict;
5use crate::span::{v04, v05, BytesData, SharedDictBytes, TraceData};
6use crate::trace_utils::collect_trace_chunks;
7use crate::{msgpack_decoder, trace_utils::cmp_send_data_payloads};
8use libdd_trace_protobuf::pb;
9use std::cmp::Ordering;
10use std::iter::Iterator;
11
12pub type TracerPayloadV04 = Vec<v04::SpanBytes>;
13pub type TracerPayloadV05 = Vec<v05::Span>;
14
15#[derive(Debug, Clone)]
16/// Enumerates the different encoding types.
17pub enum TraceEncoding {
18    /// v0.4 encoding (TracerPayloadV04).
19    V04,
20    /// v0.5 encoding (TracerPayloadV05).
21    V05,
22}
23
24#[derive(Debug)]
25pub enum TraceChunks<T: TraceData> {
26    /// Collection of TraceChunkSpan.
27    V04(Vec<Vec<v04::Span<T>>>),
28    /// Collection of TraceChunkSpan with de-duplicated strings.
29    V05((SharedDict<T::Text>, Vec<Vec<v05::Span>>)),
30    /// Collection of v0.4 spans to be serialized as a V1 msgpack payload.
31    V1(Vec<Vec<v04::Span<T>>>),
32}
33
34impl TraceChunks<BytesData> {
35    pub fn into_tracer_payload_collection(self) -> TracerPayloadCollection {
36        match self {
37            TraceChunks::V04(traces) => TracerPayloadCollection::V04(traces),
38            TraceChunks::V05(traces) => TracerPayloadCollection::V05(traces),
39            // V1 uses the same underlying span structure as V04.
40            TraceChunks::V1(traces) => TracerPayloadCollection::V04(traces),
41        }
42    }
43}
44
45impl<T: TraceData> TraceChunks<T> {
46    /// Returns the number of traces in the chunk
47    pub fn size(&self) -> usize {
48        match self {
49            TraceChunks::V04(traces) => traces.len(),
50            TraceChunks::V05((_, traces)) => traces.len(),
51            TraceChunks::V1(traces) => traces.len(),
52        }
53    }
54}
55
56#[derive(Debug)]
57/// Enum representing a general abstraction for a collection of tracer payloads.
58pub enum TracerPayloadCollection {
59    /// Collection of TracerPayloads.
60    V07(Vec<pb::TracerPayload>),
61    /// Collection of TraceChunkSpan.
62    V04(Vec<Vec<v04::SpanBytes>>),
63    /// Collection of TraceChunkSpan with de-duplicated strings.
64    V05((SharedDictBytes, Vec<Vec<v05::Span>>)),
65}
66
67impl TracerPayloadCollection {
68    /// Appends `other` collection of the same type to the current collection.
69    ///
70    /// #Arguments
71    ///
72    /// * `other`: collection of the same type.
73    ///
74    /// # Examples:
75    ///
76    /// ```rust
77    /// use libdd_trace_protobuf::pb::TracerPayload;
78    /// use libdd_trace_utils::tracer_payload::TracerPayloadCollection;
79    /// let mut col1 = TracerPayloadCollection::V07(vec![TracerPayload::default()]);
80    /// let mut col2 = TracerPayloadCollection::V07(vec![TracerPayload::default()]);
81    /// col1.append(&mut col2);
82    /// ```
83    pub fn append(&mut self, other: &mut Self) {
84        match self {
85            TracerPayloadCollection::V07(dest) => {
86                if let TracerPayloadCollection::V07(src) = other {
87                    dest.append(src)
88                }
89            }
90            TracerPayloadCollection::V04(dest) => {
91                if let TracerPayloadCollection::V04(src) = other {
92                    dest.append(src)
93                }
94            }
95            // TODO: Properly handle non-OK states to prevent possible panics (APMSP-18190).
96            #[allow(clippy::unimplemented)]
97            TracerPayloadCollection::V05(_) => unimplemented!("Append for V05 not implemented"),
98        }
99    }
100
101    /// Merges traces that came from the same origin together to reduce the payload size.
102    ///
103    /// # Examples:
104    ///
105    /// ```rust
106    /// use libdd_trace_protobuf::pb::TracerPayload;
107    /// use libdd_trace_utils::tracer_payload::TracerPayloadCollection;
108    /// let mut col1 =
109    ///     TracerPayloadCollection::V07(vec![TracerPayload::default(), TracerPayload::default()]);
110    /// col1.merge();
111    /// ```
112    pub fn merge(&mut self) {
113        if let TracerPayloadCollection::V07(collection) = self {
114            collection.sort_unstable_by(cmp_send_data_payloads);
115            collection.dedup_by(|a, b| {
116                if cmp_send_data_payloads(a, b) == Ordering::Equal {
117                    // Note: dedup_by drops a, and retains b.
118                    b.chunks.append(&mut a.chunks);
119                    return true;
120                }
121                false
122            })
123        }
124    }
125
126    /// Computes the size of the collection.
127    ///
128    /// # Returns
129    ///
130    /// The number of traces contained in the collection.
131    ///
132    /// # Examples:
133    ///
134    /// ```rust
135    /// use libdd_trace_protobuf::pb::TracerPayload;
136    /// use libdd_trace_utils::tracer_payload::TracerPayloadCollection;
137    /// let col1 = TracerPayloadCollection::V07(vec![TracerPayload::default()]);
138    /// col1.size();
139    /// ```
140    pub fn size(&self) -> usize {
141        match self {
142            TracerPayloadCollection::V07(collection) => {
143                collection.iter().map(|s| s.chunks.len()).sum()
144            }
145            TracerPayloadCollection::V04(collection) => collection.len(),
146            TracerPayloadCollection::V05((_, collection)) => collection.len(),
147        }
148    }
149}
150
151/// A trait defining custom processing to be applied to `TraceChunks`.
152///
153/// TraceChunks are part of the v07 Trace payloads. Implementors of this trait can define specific
154/// logic to modify or enrich trace chunks and pass it to the `TracerPayloadCollection` via
155/// `TracerPayloadParams`.
156///
157/// # Examples
158///
159/// Implementing `TraceChunkProcessor` to add a custom tag to each span in a chunk:
160///
161/// ```rust
162/// use libdd_trace_protobuf::pb::{Span, TraceChunk};
163/// use libdd_trace_utils::tracer_payload::TraceChunkProcessor;
164/// use std::collections::HashMap;
165///
166/// struct CustomTagProcessor {
167///     tag_key: String,
168///     tag_value: String,
169/// }
170///
171/// impl TraceChunkProcessor for CustomTagProcessor {
172///     fn process(&mut self, chunk: &mut TraceChunk, index: usize) {
173///         for span in &mut chunk.spans {
174///             span.meta
175///                 .insert(self.tag_key.clone(), self.tag_value.clone());
176///         }
177///     }
178/// }
179/// ```
180pub trait TraceChunkProcessor {
181    fn process(&mut self, chunk: &mut pb::TraceChunk, index: usize);
182}
183
184#[derive(Default)]
185/// Default implementation of `TraceChunkProcessor` that does nothing.
186///
187/// If used, the compiler should optimize away calls to it.
188pub struct DefaultTraceChunkProcessor;
189
190impl TraceChunkProcessor for DefaultTraceChunkProcessor {
191    fn process(&mut self, _chunk: &mut pb::TraceChunk, _index: usize) {
192        // Default implementation does nothing.
193    }
194}
195
196/// This method processes the msgpack data contained within `data` based on
197/// the specified `encoding_type`, converting it into a collection of tracer payloads.
198///
199/// Note: Currently only the `TraceEncoding::V04` and `TraceEncoding::V05` encoding types are
200/// supported.
201///
202/// # Returns
203///
204/// A `Result` containing either the successfully converted `TraceChunks` and the length consummed
205/// from the data  or an error if the conversion fails. Possible errors include issues with
206/// deserializing the msgpack data or if the data does not conform to the expected format.
207///
208/// # Examples
209///
210/// ```rust
211/// use libdd_tinybytes;
212/// use libdd_trace_protobuf::pb;
213/// use libdd_trace_utils::trace_utils::TracerHeaderTags;
214/// use libdd_trace_utils::tracer_payload::{decode_to_trace_chunks, TraceEncoding};
215/// use std::convert::TryInto;
216/// // This will likely be a &[u8] slice in practice.
217/// let data: Vec<u8> = Vec::new();
218/// let data_as_bytes = libdd_tinybytes::Bytes::from(data);
219/// let result = decode_to_trace_chunks(data_as_bytes, TraceEncoding::V04)
220///     .map(|(chunks, _size)| chunks.into_tracer_payload_collection());
221///
222/// match result {
223///     Ok(collection) => println!("Successfully converted to TracerPayloadCollection."),
224///     Err(e) => println!("Failed to convert: {:?}", e),
225/// }
226/// ```
227pub fn decode_to_trace_chunks(
228    data: libdd_tinybytes::Bytes,
229    encoding_type: TraceEncoding,
230) -> Result<(TraceChunks<BytesData>, usize), anyhow::Error> {
231    let (data, size) = match encoding_type {
232        TraceEncoding::V04 => msgpack_decoder::v04::from_bytes(data),
233        TraceEncoding::V05 => msgpack_decoder::v05::from_bytes(data),
234    }
235    .map_err(|e| anyhow::format_err!("Error deserializing trace from request body: {e}"))?;
236
237    Ok((
238        collect_trace_chunks(data, matches!(encoding_type, TraceEncoding::V05))?,
239        size,
240    ))
241}
242
243#[cfg(test)]
244mod tests {
245    use super::*;
246    use crate::span::v04::SpanBytes;
247    use crate::test_utils::create_test_no_alloc_span;
248    use libdd_tinybytes::BytesString;
249    use libdd_trace_protobuf::pb;
250    use serde_json::json;
251    use std::collections::HashMap;
252
253    fn create_dummy_collection_v07() -> TracerPayloadCollection {
254        TracerPayloadCollection::V07(vec![pb::TracerPayload {
255            container_id: "".to_string(),
256            language_name: "".to_string(),
257            language_version: "".to_string(),
258            tracer_version: "".to_string(),
259            runtime_id: "".to_string(),
260            chunks: vec![pb::TraceChunk {
261                priority: 0,
262                origin: "".to_string(),
263                spans: vec![],
264                tags: Default::default(),
265                dropped_trace: false,
266            }],
267            tags: Default::default(),
268            env: "".to_string(),
269            hostname: "".to_string(),
270            app_version: "".to_string(),
271        }])
272    }
273
274    fn create_trace() -> Vec<SpanBytes> {
275        vec![
276            // create a root span with metrics
277            create_test_no_alloc_span(1234, 12341, 0, 1, true),
278            create_test_no_alloc_span(1234, 12342, 12341, 1, false),
279            create_test_no_alloc_span(1234, 12343, 12342, 1, false),
280        ]
281    }
282
283    #[test]
284    fn test_append_traces_v07() {
285        let mut two_traces = create_dummy_collection_v07();
286        two_traces.append(&mut create_dummy_collection_v07());
287
288        let mut trace = create_dummy_collection_v07();
289
290        let mut empty = TracerPayloadCollection::V07(vec![]);
291
292        trace.append(&mut create_dummy_collection_v07());
293        assert_eq!(2, trace.size());
294
295        trace.append(&mut two_traces);
296        assert_eq!(4, trace.size());
297
298        trace.append(&mut empty);
299        assert_eq!(4, trace.size());
300    }
301
302    #[test]
303    fn test_append_traces_v04() {
304        fn create_trace() -> TracerPayloadCollection {
305            TracerPayloadCollection::V04(vec![vec![create_test_no_alloc_span(0, 1, 0, 2, true)]])
306        }
307
308        let mut two_traces = create_trace();
309        two_traces.append(&mut create_trace());
310
311        let mut trace = create_trace();
312
313        let mut empty = TracerPayloadCollection::V04(vec![]);
314
315        trace.append(&mut create_trace());
316        assert_eq!(2, trace.size());
317
318        trace.append(&mut two_traces);
319        assert_eq!(4, trace.size());
320
321        trace.append(&mut empty);
322        assert_eq!(4, trace.size());
323    }
324
325    #[test]
326    fn test_merge_traces() {
327        let mut trace = create_dummy_collection_v07();
328
329        trace.append(&mut create_dummy_collection_v07());
330        assert_eq!(2, trace.size());
331
332        trace.merge();
333        assert_eq!(2, trace.size());
334        if let TracerPayloadCollection::V07(collection) = trace {
335            assert_eq!(1, collection.len());
336        } else {
337            panic!("Unexpected type");
338        }
339    }
340
341    #[test]
342    fn test_try_into_success() {
343        let span_data1 = json!([{
344            "service": "test-service",
345            "name": "test-service-name",
346            "resource": "test-service-resource",
347            "trace_id": 111,
348            "span_id": 222,
349            "parent_id": 100,
350            "start": 1,
351            "duration": 5,
352            "error": 0,
353            "meta": {},
354            "metrics": {},
355            "type": "serverless",
356        }]);
357
358        let expected_serialized_span_data1 = vec![SpanBytes {
359            service: BytesString::from_slice("test-service".as_ref()).unwrap(),
360            name: BytesString::from_slice("test-service-name".as_ref()).unwrap(),
361            resource: BytesString::from_slice("test-service-resource".as_ref()).unwrap(),
362            trace_id: 111,
363            span_id: 222,
364            parent_id: 100,
365            start: 1,
366            duration: 5,
367            error: 0,
368            meta: HashMap::new(),
369            metrics: HashMap::new(),
370            meta_struct: HashMap::new(),
371            r#type: BytesString::from_slice("serverless".as_ref()).unwrap(),
372            span_links: vec![],
373            span_events: vec![],
374        }];
375
376        let span_data2 = json!([{
377            "service": "test-service",
378            "name": "test-service-name",
379            "resource": "test-service-resource",
380            "trace_id": 111,
381            "span_id": 333,
382            "parent_id": 100,
383            "start": 1,
384            "duration": 5,
385            "error": 1,
386            "meta": {},
387            "metrics": {},
388            "type": "",
389        }]);
390
391        let expected_serialized_span_data2 = vec![SpanBytes {
392            service: BytesString::from_slice("test-service".as_ref()).unwrap(),
393            name: BytesString::from_slice("test-service-name".as_ref()).unwrap(),
394            resource: BytesString::from_slice("test-service-resource".as_ref()).unwrap(),
395            trace_id: 111,
396            span_id: 333,
397            parent_id: 100,
398            start: 1,
399            duration: 5,
400            error: 1,
401            meta: HashMap::new(),
402            metrics: HashMap::new(),
403            meta_struct: HashMap::new(),
404            r#type: BytesString::default(),
405            span_links: vec![],
406            span_events: vec![],
407        }];
408
409        let data = rmp_serde::to_vec(&vec![span_data1, span_data2])
410            .expect("Failed to serialize test span.");
411        let data = libdd_tinybytes::Bytes::from(data);
412
413        let result = decode_to_trace_chunks(data, TraceEncoding::V04);
414
415        assert!(result.is_ok());
416
417        let (chunks, _) = result.unwrap();
418        assert_eq!(2, chunks.size());
419
420        if let TraceChunks::V04(traces) = chunks {
421            assert_eq!(expected_serialized_span_data1, traces[0]);
422            assert_eq!(expected_serialized_span_data2, traces[1]);
423        } else {
424            panic!("Invalid collection type returned for try_into");
425        }
426    }
427
428    #[cfg_attr(miri, ignore)]
429    #[test]
430    fn test_try_into_empty() {
431        let empty_data = vec![0x90];
432        let data = libdd_tinybytes::Bytes::from(empty_data);
433
434        let result = decode_to_trace_chunks(data, TraceEncoding::V04);
435
436        assert!(result.is_ok());
437
438        let (collection, _) = result.unwrap();
439        assert_eq!(0, collection.size());
440    }
441
442    #[test]
443    fn test_try_into_meta_metrics_success() {
444        let dummy_trace = create_trace();
445        let expected = vec![create_trace()];
446        let payload = rmp_serde::to_vec_named(&expected).unwrap();
447        let payload = libdd_tinybytes::Bytes::from(payload);
448
449        let result = decode_to_trace_chunks(payload, TraceEncoding::V04);
450
451        assert!(result.is_ok());
452
453        let (collection, _size) = result.unwrap();
454        assert_eq!(1, collection.size());
455        if let TraceChunks::V04(traces) = collection {
456            assert_eq!(dummy_trace, traces[0]);
457        } else {
458            panic!("Invalid collection type returned for try_into");
459        }
460    }
461}