Skip to main content

libdd_trace_utils/
tracer_payload.rs

1// Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/
2// SPDX-License-Identifier: Apache-2.0
3
4use crate::span::v05::dict::SharedDict;
5use crate::span::{v04, v05, BytesData, SharedDictBytes, TraceData};
6use crate::trace_utils::collect_trace_chunks;
7use crate::{msgpack_decoder, trace_utils::cmp_send_data_payloads};
8use libdd_trace_protobuf::pb;
9use std::cmp::Ordering;
10use std::iter::Iterator;
11
12pub type TracerPayloadV04 = Vec<v04::SpanBytes>;
13pub type TracerPayloadV05 = Vec<v05::Span>;
14
15#[derive(Debug, Clone)]
16/// Enumerates the different encoding types.
17pub enum TraceEncoding {
18    /// v0.4 encoding (TracerPayloadV04).
19    V04,
20    /// v0.5 encoding (TracerPayloadV05).
21    V05,
22}
23
24#[derive(Debug)]
25pub enum TraceChunks<T: TraceData> {
26    /// Collection of TraceChunkSpan.
27    V04(Vec<Vec<v04::Span<T>>>),
28    /// Collection of TraceChunkSpan with de-duplicated strings.
29    V05((SharedDict<T::Text>, Vec<Vec<v05::Span>>)),
30}
31
32impl TraceChunks<BytesData> {
33    pub fn into_tracer_payload_collection(self) -> TracerPayloadCollection {
34        match self {
35            TraceChunks::V04(traces) => TracerPayloadCollection::V04(traces),
36            TraceChunks::V05(traces) => TracerPayloadCollection::V05(traces),
37        }
38    }
39}
40
41impl<T: TraceData> TraceChunks<T> {
42    /// Returns the number of traces in the chunk
43    pub fn size(&self) -> usize {
44        match self {
45            TraceChunks::V04(traces) => traces.len(),
46            TraceChunks::V05((_, traces)) => traces.len(),
47        }
48    }
49}
50
51#[derive(Debug)]
52/// Enum representing a general abstraction for a collection of tracer payloads.
53pub enum TracerPayloadCollection {
54    /// Collection of TracerPayloads.
55    V07(Vec<pb::TracerPayload>),
56    /// Collection of TraceChunkSpan.
57    V04(Vec<Vec<v04::SpanBytes>>),
58    /// Collection of TraceChunkSpan with de-duplicated strings.
59    V05((SharedDictBytes, Vec<Vec<v05::Span>>)),
60}
61
62impl TracerPayloadCollection {
63    /// Appends `other` collection of the same type to the current collection.
64    ///
65    /// #Arguments
66    ///
67    /// * `other`: collection of the same type.
68    ///
69    /// # Examples:
70    ///
71    /// ```rust
72    /// use libdd_trace_protobuf::pb::TracerPayload;
73    /// use libdd_trace_utils::tracer_payload::TracerPayloadCollection;
74    /// let mut col1 = TracerPayloadCollection::V07(vec![TracerPayload::default()]);
75    /// let mut col2 = TracerPayloadCollection::V07(vec![TracerPayload::default()]);
76    /// col1.append(&mut col2);
77    /// ```
78    pub fn append(&mut self, other: &mut Self) {
79        match self {
80            TracerPayloadCollection::V07(dest) => {
81                if let TracerPayloadCollection::V07(src) = other {
82                    dest.append(src)
83                }
84            }
85            TracerPayloadCollection::V04(dest) => {
86                if let TracerPayloadCollection::V04(src) = other {
87                    dest.append(src)
88                }
89            }
90            // TODO: Properly handle non-OK states to prevent possible panics (APMSP-18190).
91            #[allow(clippy::unimplemented)]
92            TracerPayloadCollection::V05(_) => unimplemented!("Append for V05 not implemented"),
93        }
94    }
95
96    /// Merges traces that came from the same origin together to reduce the payload size.
97    ///
98    /// # Examples:
99    ///
100    /// ```rust
101    /// use libdd_trace_protobuf::pb::TracerPayload;
102    /// use libdd_trace_utils::tracer_payload::TracerPayloadCollection;
103    /// let mut col1 =
104    ///     TracerPayloadCollection::V07(vec![TracerPayload::default(), TracerPayload::default()]);
105    /// col1.merge();
106    /// ```
107    pub fn merge(&mut self) {
108        if let TracerPayloadCollection::V07(collection) = self {
109            collection.sort_unstable_by(cmp_send_data_payloads);
110            collection.dedup_by(|a, b| {
111                if cmp_send_data_payloads(a, b) == Ordering::Equal {
112                    // Note: dedup_by drops a, and retains b.
113                    b.chunks.append(&mut a.chunks);
114                    return true;
115                }
116                false
117            })
118        }
119    }
120
121    /// Computes the size of the collection.
122    ///
123    /// # Returns
124    ///
125    /// The number of traces contained in the collection.
126    ///
127    /// # Examples:
128    ///
129    /// ```rust
130    /// use libdd_trace_protobuf::pb::TracerPayload;
131    /// use libdd_trace_utils::tracer_payload::TracerPayloadCollection;
132    /// let col1 = TracerPayloadCollection::V07(vec![TracerPayload::default()]);
133    /// col1.size();
134    /// ```
135    pub fn size(&self) -> usize {
136        match self {
137            TracerPayloadCollection::V07(collection) => {
138                collection.iter().map(|s| s.chunks.len()).sum()
139            }
140            TracerPayloadCollection::V04(collection) => collection.len(),
141            TracerPayloadCollection::V05((_, collection)) => collection.len(),
142        }
143    }
144}
145
146/// A trait defining custom processing to be applied to `TraceChunks`.
147///
148/// TraceChunks are part of the v07 Trace payloads. Implementors of this trait can define specific
149/// logic to modify or enrich trace chunks and pass it to the `TracerPayloadCollection` via
150/// `TracerPayloadParams`.
151///
152/// # Examples
153///
154/// Implementing `TraceChunkProcessor` to add a custom tag to each span in a chunk:
155///
156/// ```rust
157/// use libdd_trace_protobuf::pb::{Span, TraceChunk};
158/// use libdd_trace_utils::tracer_payload::TraceChunkProcessor;
159/// use std::collections::HashMap;
160///
161/// struct CustomTagProcessor {
162///     tag_key: String,
163///     tag_value: String,
164/// }
165///
166/// impl TraceChunkProcessor for CustomTagProcessor {
167///     fn process(&mut self, chunk: &mut TraceChunk, index: usize) {
168///         for span in &mut chunk.spans {
169///             span.meta
170///                 .insert(self.tag_key.clone(), self.tag_value.clone());
171///         }
172///     }
173/// }
174/// ```
175pub trait TraceChunkProcessor {
176    fn process(&mut self, chunk: &mut pb::TraceChunk, index: usize);
177}
178
179#[derive(Default)]
180/// Default implementation of `TraceChunkProcessor` that does nothing.
181///
182/// If used, the compiler should optimize away calls to it.
183pub struct DefaultTraceChunkProcessor;
184
185impl TraceChunkProcessor for DefaultTraceChunkProcessor {
186    fn process(&mut self, _chunk: &mut pb::TraceChunk, _index: usize) {
187        // Default implementation does nothing.
188    }
189}
190
191/// This method processes the msgpack data contained within `data` based on
192/// the specified `encoding_type`, converting it into a collection of tracer payloads.
193///
194/// Note: Currently only the `TraceEncoding::V04` and `TraceEncoding::V05` encoding types are
195/// supported.
196///
197/// # Returns
198///
199/// A `Result` containing either the successfully converted `TraceChunks` and the length consummed
200/// from the data  or an error if the conversion fails. Possible errors include issues with
201/// deserializing the msgpack data or if the data does not conform to the expected format.
202///
203/// # Examples
204///
205/// ```rust
206/// use libdd_tinybytes;
207/// use libdd_trace_protobuf::pb;
208/// use libdd_trace_utils::trace_utils::TracerHeaderTags;
209/// use libdd_trace_utils::tracer_payload::{decode_to_trace_chunks, TraceEncoding};
210/// use std::convert::TryInto;
211/// // This will likely be a &[u8] slice in practice.
212/// let data: Vec<u8> = Vec::new();
213/// let data_as_bytes = libdd_tinybytes::Bytes::from(data);
214/// let result = decode_to_trace_chunks(data_as_bytes, TraceEncoding::V04)
215///     .map(|(chunks, _size)| chunks.into_tracer_payload_collection());
216///
217/// match result {
218///     Ok(collection) => println!("Successfully converted to TracerPayloadCollection."),
219///     Err(e) => println!("Failed to convert: {:?}", e),
220/// }
221/// ```
222pub fn decode_to_trace_chunks(
223    data: libdd_tinybytes::Bytes,
224    encoding_type: TraceEncoding,
225) -> Result<(TraceChunks<BytesData>, usize), anyhow::Error> {
226    let (data, size) = match encoding_type {
227        TraceEncoding::V04 => msgpack_decoder::v04::from_bytes(data),
228        TraceEncoding::V05 => msgpack_decoder::v05::from_bytes(data),
229    }
230    .map_err(|e| anyhow::format_err!("Error deserializing trace from request body: {e}"))?;
231
232    Ok((
233        collect_trace_chunks(data, matches!(encoding_type, TraceEncoding::V05))?,
234        size,
235    ))
236}
237
238#[cfg(test)]
239mod tests {
240    use super::*;
241    use crate::span::v04::SpanBytes;
242    use crate::test_utils::create_test_no_alloc_span;
243    use libdd_tinybytes::BytesString;
244    use libdd_trace_protobuf::pb;
245    use serde_json::json;
246    use std::collections::HashMap;
247
248    fn create_dummy_collection_v07() -> TracerPayloadCollection {
249        TracerPayloadCollection::V07(vec![pb::TracerPayload {
250            container_id: "".to_string(),
251            language_name: "".to_string(),
252            language_version: "".to_string(),
253            tracer_version: "".to_string(),
254            runtime_id: "".to_string(),
255            chunks: vec![pb::TraceChunk {
256                priority: 0,
257                origin: "".to_string(),
258                spans: vec![],
259                tags: Default::default(),
260                dropped_trace: false,
261            }],
262            tags: Default::default(),
263            env: "".to_string(),
264            hostname: "".to_string(),
265            app_version: "".to_string(),
266        }])
267    }
268
269    fn create_trace() -> Vec<SpanBytes> {
270        vec![
271            // create a root span with metrics
272            create_test_no_alloc_span(1234, 12341, 0, 1, true),
273            create_test_no_alloc_span(1234, 12342, 12341, 1, false),
274            create_test_no_alloc_span(1234, 12343, 12342, 1, false),
275        ]
276    }
277
278    #[test]
279    fn test_append_traces_v07() {
280        let mut two_traces = create_dummy_collection_v07();
281        two_traces.append(&mut create_dummy_collection_v07());
282
283        let mut trace = create_dummy_collection_v07();
284
285        let mut empty = TracerPayloadCollection::V07(vec![]);
286
287        trace.append(&mut create_dummy_collection_v07());
288        assert_eq!(2, trace.size());
289
290        trace.append(&mut two_traces);
291        assert_eq!(4, trace.size());
292
293        trace.append(&mut empty);
294        assert_eq!(4, trace.size());
295    }
296
297    #[test]
298    fn test_append_traces_v04() {
299        fn create_trace() -> TracerPayloadCollection {
300            TracerPayloadCollection::V04(vec![vec![create_test_no_alloc_span(0, 1, 0, 2, true)]])
301        }
302
303        let mut two_traces = create_trace();
304        two_traces.append(&mut create_trace());
305
306        let mut trace = create_trace();
307
308        let mut empty = TracerPayloadCollection::V04(vec![]);
309
310        trace.append(&mut create_trace());
311        assert_eq!(2, trace.size());
312
313        trace.append(&mut two_traces);
314        assert_eq!(4, trace.size());
315
316        trace.append(&mut empty);
317        assert_eq!(4, trace.size());
318    }
319
320    #[test]
321    fn test_merge_traces() {
322        let mut trace = create_dummy_collection_v07();
323
324        trace.append(&mut create_dummy_collection_v07());
325        assert_eq!(2, trace.size());
326
327        trace.merge();
328        assert_eq!(2, trace.size());
329        if let TracerPayloadCollection::V07(collection) = trace {
330            assert_eq!(1, collection.len());
331        } else {
332            panic!("Unexpected type");
333        }
334    }
335
336    #[test]
337    fn test_try_into_success() {
338        let span_data1 = json!([{
339            "service": "test-service",
340            "name": "test-service-name",
341            "resource": "test-service-resource",
342            "trace_id": 111,
343            "span_id": 222,
344            "parent_id": 100,
345            "start": 1,
346            "duration": 5,
347            "error": 0,
348            "meta": {},
349            "metrics": {},
350            "type": "serverless",
351        }]);
352
353        let expected_serialized_span_data1 = vec![SpanBytes {
354            service: BytesString::from_slice("test-service".as_ref()).unwrap(),
355            name: BytesString::from_slice("test-service-name".as_ref()).unwrap(),
356            resource: BytesString::from_slice("test-service-resource".as_ref()).unwrap(),
357            trace_id: 111,
358            span_id: 222,
359            parent_id: 100,
360            start: 1,
361            duration: 5,
362            error: 0,
363            meta: HashMap::new(),
364            metrics: HashMap::new(),
365            meta_struct: HashMap::new(),
366            r#type: BytesString::from_slice("serverless".as_ref()).unwrap(),
367            span_links: vec![],
368            span_events: vec![],
369        }];
370
371        let span_data2 = json!([{
372            "service": "test-service",
373            "name": "test-service-name",
374            "resource": "test-service-resource",
375            "trace_id": 111,
376            "span_id": 333,
377            "parent_id": 100,
378            "start": 1,
379            "duration": 5,
380            "error": 1,
381            "meta": {},
382            "metrics": {},
383            "type": "",
384        }]);
385
386        let expected_serialized_span_data2 = vec![SpanBytes {
387            service: BytesString::from_slice("test-service".as_ref()).unwrap(),
388            name: BytesString::from_slice("test-service-name".as_ref()).unwrap(),
389            resource: BytesString::from_slice("test-service-resource".as_ref()).unwrap(),
390            trace_id: 111,
391            span_id: 333,
392            parent_id: 100,
393            start: 1,
394            duration: 5,
395            error: 1,
396            meta: HashMap::new(),
397            metrics: HashMap::new(),
398            meta_struct: HashMap::new(),
399            r#type: BytesString::default(),
400            span_links: vec![],
401            span_events: vec![],
402        }];
403
404        let data = rmp_serde::to_vec(&vec![span_data1, span_data2])
405            .expect("Failed to serialize test span.");
406        let data = libdd_tinybytes::Bytes::from(data);
407
408        let result = decode_to_trace_chunks(data, TraceEncoding::V04);
409
410        assert!(result.is_ok());
411
412        let (chunks, _) = result.unwrap();
413        assert_eq!(2, chunks.size());
414
415        if let TraceChunks::V04(traces) = chunks {
416            assert_eq!(expected_serialized_span_data1, traces[0]);
417            assert_eq!(expected_serialized_span_data2, traces[1]);
418        } else {
419            panic!("Invalid collection type returned for try_into");
420        }
421    }
422
423    #[cfg_attr(miri, ignore)]
424    #[test]
425    fn test_try_into_empty() {
426        let empty_data = vec![0x90];
427        let data = libdd_tinybytes::Bytes::from(empty_data);
428
429        let result = decode_to_trace_chunks(data, TraceEncoding::V04);
430
431        assert!(result.is_ok());
432
433        let (collection, _) = result.unwrap();
434        assert_eq!(0, collection.size());
435    }
436
437    #[test]
438    fn test_try_into_meta_metrics_success() {
439        let dummy_trace = create_trace();
440        let expected = vec![create_trace()];
441        let payload = rmp_serde::to_vec_named(&expected).unwrap();
442        let payload = libdd_tinybytes::Bytes::from(payload);
443
444        let result = decode_to_trace_chunks(payload, TraceEncoding::V04);
445
446        assert!(result.is_ok());
447
448        let (collection, _size) = result.unwrap();
449        assert_eq!(1, collection.size());
450        if let TraceChunks::V04(traces) = collection {
451            assert_eq!(dummy_trace, traces[0]);
452        } else {
453            panic!("Invalid collection type returned for try_into");
454        }
455    }
456}