libdd_trace_utils/
tracer_payload.rs

1// Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/
2// SPDX-License-Identifier: Apache-2.0
3
4use crate::span::v05::dict::SharedDict;
5use crate::span::{v05, SharedDictBytes, Span, SpanBytes, SpanText};
6use crate::trace_utils::collect_trace_chunks;
7use crate::{msgpack_decoder, trace_utils::cmp_send_data_payloads};
8use libdd_tinybytes::BytesString;
9use libdd_trace_protobuf::pb;
10use std::cmp::Ordering;
11use std::iter::Iterator;
12
13pub type TracerPayloadV04 = Vec<SpanBytes>;
14pub type TracerPayloadV05 = Vec<v05::Span>;
15
16#[derive(Debug, Clone)]
17/// Enumerates the different encoding types.
18pub enum TraceEncoding {
19    /// v0.4 encoding (TracerPayloadV04).
20    V04,
21    /// v054 encoding (TracerPayloadV04).
22    V05,
23}
24
25#[derive(Debug, Clone)]
26pub enum TraceChunks<T: SpanText> {
27    /// Collection of TraceChunkSpan.
28    V04(Vec<Vec<Span<T>>>),
29    /// Collection of TraceChunkSpan with de-duplicated strings.
30    V05((SharedDict<T>, Vec<Vec<v05::Span>>)),
31}
32
33impl TraceChunks<BytesString> {
34    pub fn into_tracer_payload_collection(self) -> TracerPayloadCollection {
35        match self {
36            TraceChunks::V04(traces) => TracerPayloadCollection::V04(traces),
37            TraceChunks::V05(traces) => TracerPayloadCollection::V05(traces),
38        }
39    }
40}
41
42impl<T: SpanText> TraceChunks<T> {
43    /// Returns the number of traces in the chunk
44    pub fn size(&self) -> usize {
45        match self {
46            TraceChunks::V04(traces) => traces.len(),
47            TraceChunks::V05((_, traces)) => traces.len(),
48        }
49    }
50}
51
52#[derive(Debug, Clone)]
53/// Enum representing a general abstraction for a collection of tracer payloads.
54pub enum TracerPayloadCollection {
55    /// Collection of TracerPayloads.
56    V07(Vec<pb::TracerPayload>),
57    /// Collection of TraceChunkSpan.
58    V04(Vec<Vec<SpanBytes>>),
59    /// Collection of TraceChunkSpan with de-duplicated strings.
60    V05((SharedDictBytes, Vec<Vec<v05::Span>>)),
61}
62
63impl TracerPayloadCollection {
64    /// Appends `other` collection of the same type to the current collection.
65    ///
66    /// #Arguments
67    ///
68    /// * `other`: collection of the same type.
69    ///
70    /// # Examples:
71    ///
72    /// ```rust
73    /// use libdd_trace_protobuf::pb::TracerPayload;
74    /// use libdd_trace_utils::tracer_payload::TracerPayloadCollection;
75    /// let mut col1 = TracerPayloadCollection::V07(vec![TracerPayload::default()]);
76    /// let mut col2 = TracerPayloadCollection::V07(vec![TracerPayload::default()]);
77    /// col1.append(&mut col2);
78    /// ```
79    pub fn append(&mut self, other: &mut Self) {
80        match self {
81            TracerPayloadCollection::V07(dest) => {
82                if let TracerPayloadCollection::V07(src) = other {
83                    dest.append(src)
84                }
85            }
86            TracerPayloadCollection::V04(dest) => {
87                if let TracerPayloadCollection::V04(src) = other {
88                    dest.append(src)
89                }
90            }
91            // TODO: Properly handle non-OK states to prevent possible panics (APMSP-18190).
92            #[allow(clippy::unimplemented)]
93            TracerPayloadCollection::V05(_) => unimplemented!("Append for V05 not implemented"),
94        }
95    }
96
97    /// Merges traces that came from the same origin together to reduce the payload size.
98    ///
99    /// # Examples:
100    ///
101    /// ```rust
102    /// use libdd_trace_protobuf::pb::TracerPayload;
103    /// use libdd_trace_utils::tracer_payload::TracerPayloadCollection;
104    /// let mut col1 =
105    ///     TracerPayloadCollection::V07(vec![TracerPayload::default(), TracerPayload::default()]);
106    /// col1.merge();
107    /// ```
108    pub fn merge(&mut self) {
109        if let TracerPayloadCollection::V07(collection) = self {
110            collection.sort_unstable_by(cmp_send_data_payloads);
111            collection.dedup_by(|a, b| {
112                if cmp_send_data_payloads(a, b) == Ordering::Equal {
113                    // Note: dedup_by drops a, and retains b.
114                    b.chunks.append(&mut a.chunks);
115                    return true;
116                }
117                false
118            })
119        }
120    }
121
122    /// Computes the size of the collection.
123    ///
124    /// # Returns
125    ///
126    /// The number of traces contained in the collection.
127    ///
128    /// # Examples:
129    ///
130    /// ```rust
131    /// use libdd_trace_protobuf::pb::TracerPayload;
132    /// use libdd_trace_utils::tracer_payload::TracerPayloadCollection;
133    /// let col1 = TracerPayloadCollection::V07(vec![TracerPayload::default()]);
134    /// col1.size();
135    /// ```
136    pub fn size(&self) -> usize {
137        match self {
138            TracerPayloadCollection::V07(collection) => {
139                collection.iter().map(|s| s.chunks.len()).sum()
140            }
141            TracerPayloadCollection::V04(collection) => collection.len(),
142            TracerPayloadCollection::V05((_, collection)) => collection.len(),
143        }
144    }
145}
146
147/// A trait defining custom processing to be applied to `TraceChunks`.
148///
149/// TraceChunks are part of the v07 Trace payloads. Implementors of this trait can define specific
150/// logic to modify or enrich trace chunks and pass it to the `TracerPayloadCollection` via
151/// `TracerPayloadParams`.
152///
153/// # Examples
154///
155/// Implementing `TraceChunkProcessor` to add a custom tag to each span in a chunk:
156///
157/// ```rust
158/// use libdd_trace_protobuf::pb::{Span, TraceChunk};
159/// use libdd_trace_utils::tracer_payload::TraceChunkProcessor;
160/// use std::collections::HashMap;
161///
162/// struct CustomTagProcessor {
163///     tag_key: String,
164///     tag_value: String,
165/// }
166///
167/// impl TraceChunkProcessor for CustomTagProcessor {
168///     fn process(&mut self, chunk: &mut TraceChunk, index: usize) {
169///         for span in &mut chunk.spans {
170///             span.meta
171///                 .insert(self.tag_key.clone(), self.tag_value.clone());
172///         }
173///     }
174/// }
175/// ```
176pub trait TraceChunkProcessor {
177    fn process(&mut self, chunk: &mut pb::TraceChunk, index: usize);
178}
179
180#[derive(Default)]
181/// Default implementation of `TraceChunkProcessor` that does nothing.
182///
183/// If used, the compiler should optimize away calls to it.
184pub struct DefaultTraceChunkProcessor;
185
186impl TraceChunkProcessor for DefaultTraceChunkProcessor {
187    fn process(&mut self, _chunk: &mut pb::TraceChunk, _index: usize) {
188        // Default implementation does nothing.
189    }
190}
191
192/// This method processes the msgpack data contained within `data` based on
193/// the specified `encoding_type`, converting it into a collection of tracer payloads.
194///
195/// Note: Currently only the `TraceEncoding::V04` and `TraceEncoding::V05` encoding types are
196/// supported.
197///
198/// # Returns
199///
200/// A `Result` containing either the successfully converted `TraceChunks` and the length consummed
201/// from the data  or an error if the conversion fails. Possible errors include issues with
202/// deserializing the msgpack data or if the data does not conform to the expected format.
203///
204/// # Examples
205///
206/// ```rust
207/// use libdd_tinybytes;
208/// use libdd_trace_protobuf::pb;
209/// use libdd_trace_utils::trace_utils::TracerHeaderTags;
210/// use libdd_trace_utils::tracer_payload::{decode_to_trace_chunks, TraceEncoding};
211/// use std::convert::TryInto;
212/// // This will likely be a &[u8] slice in practice.
213/// let data: Vec<u8> = Vec::new();
214/// let data_as_bytes = libdd_tinybytes::Bytes::from(data);
215/// let result = decode_to_trace_chunks(data_as_bytes, TraceEncoding::V04)
216///     .map(|(chunks, _size)| chunks.into_tracer_payload_collection());
217///
218/// match result {
219///     Ok(collection) => println!("Successfully converted to TracerPayloadCollection."),
220///     Err(e) => println!("Failed to convert: {:?}", e),
221/// }
222/// ```
223pub fn decode_to_trace_chunks(
224    data: libdd_tinybytes::Bytes,
225    encoding_type: TraceEncoding,
226) -> Result<(TraceChunks<BytesString>, usize), anyhow::Error> {
227    let (data, size) = match encoding_type {
228        TraceEncoding::V04 => msgpack_decoder::v04::from_bytes(data),
229        TraceEncoding::V05 => msgpack_decoder::v05::from_bytes(data),
230    }
231    .map_err(|e| anyhow::format_err!("Error deserializing trace from request body: {e}"))?;
232
233    Ok((
234        collect_trace_chunks(data, matches!(encoding_type, TraceEncoding::V05))?,
235        size,
236    ))
237}
238
239#[cfg(test)]
240mod tests {
241    use super::*;
242    use crate::span::SpanBytes;
243    use crate::test_utils::create_test_no_alloc_span;
244    use libdd_tinybytes::BytesString;
245    use libdd_trace_protobuf::pb;
246    use serde_json::json;
247    use std::collections::HashMap;
248
249    fn create_dummy_collection_v07() -> TracerPayloadCollection {
250        TracerPayloadCollection::V07(vec![pb::TracerPayload {
251            container_id: "".to_string(),
252            language_name: "".to_string(),
253            language_version: "".to_string(),
254            tracer_version: "".to_string(),
255            runtime_id: "".to_string(),
256            chunks: vec![pb::TraceChunk {
257                priority: 0,
258                origin: "".to_string(),
259                spans: vec![],
260                tags: Default::default(),
261                dropped_trace: false,
262            }],
263            tags: Default::default(),
264            env: "".to_string(),
265            hostname: "".to_string(),
266            app_version: "".to_string(),
267        }])
268    }
269
270    fn create_trace() -> Vec<SpanBytes> {
271        vec![
272            // create a root span with metrics
273            create_test_no_alloc_span(1234, 12341, 0, 1, true),
274            create_test_no_alloc_span(1234, 12342, 12341, 1, false),
275            create_test_no_alloc_span(1234, 12343, 12342, 1, false),
276        ]
277    }
278
279    #[test]
280    fn test_append_traces_v07() {
281        let mut trace = create_dummy_collection_v07();
282
283        let empty = TracerPayloadCollection::V07(vec![]);
284
285        trace.append(&mut trace.clone());
286        assert_eq!(2, trace.size());
287
288        trace.append(&mut trace.clone());
289        assert_eq!(4, trace.size());
290
291        trace.append(&mut empty.clone());
292        assert_eq!(4, trace.size());
293    }
294
295    #[test]
296    fn test_append_traces_v04() {
297        let mut trace =
298            TracerPayloadCollection::V04(vec![vec![create_test_no_alloc_span(0, 1, 0, 2, true)]]);
299
300        let empty = TracerPayloadCollection::V04(vec![]);
301
302        trace.append(&mut trace.clone());
303        assert_eq!(2, trace.size());
304
305        trace.append(&mut trace.clone());
306        assert_eq!(4, trace.size());
307
308        trace.append(&mut empty.clone());
309        assert_eq!(4, trace.size());
310    }
311
312    #[test]
313    fn test_merge_traces() {
314        let mut trace = create_dummy_collection_v07();
315
316        trace.append(&mut trace.clone());
317        assert_eq!(2, trace.size());
318
319        trace.merge();
320        assert_eq!(2, trace.size());
321        if let TracerPayloadCollection::V07(collection) = trace {
322            assert_eq!(1, collection.len());
323        } else {
324            panic!("Unexpected type");
325        }
326    }
327
328    #[test]
329    fn test_try_into_success() {
330        let span_data1 = json!([{
331            "service": "test-service",
332            "name": "test-service-name",
333            "resource": "test-service-resource",
334            "trace_id": 111,
335            "span_id": 222,
336            "parent_id": 100,
337            "start": 1,
338            "duration": 5,
339            "error": 0,
340            "meta": {},
341            "metrics": {},
342            "type": "serverless",
343        }]);
344
345        let expected_serialized_span_data1 = vec![SpanBytes {
346            service: BytesString::from_slice("test-service".as_ref()).unwrap(),
347            name: BytesString::from_slice("test-service-name".as_ref()).unwrap(),
348            resource: BytesString::from_slice("test-service-resource".as_ref()).unwrap(),
349            trace_id: 111,
350            span_id: 222,
351            parent_id: 100,
352            start: 1,
353            duration: 5,
354            error: 0,
355            meta: HashMap::new(),
356            metrics: HashMap::new(),
357            meta_struct: HashMap::new(),
358            r#type: BytesString::from_slice("serverless".as_ref()).unwrap(),
359            span_links: vec![],
360            span_events: vec![],
361        }];
362
363        let span_data2 = json!([{
364            "service": "test-service",
365            "name": "test-service-name",
366            "resource": "test-service-resource",
367            "trace_id": 111,
368            "span_id": 333,
369            "parent_id": 100,
370            "start": 1,
371            "duration": 5,
372            "error": 1,
373            "meta": {},
374            "metrics": {},
375            "type": "",
376        }]);
377
378        let expected_serialized_span_data2 = vec![SpanBytes {
379            service: BytesString::from_slice("test-service".as_ref()).unwrap(),
380            name: BytesString::from_slice("test-service-name".as_ref()).unwrap(),
381            resource: BytesString::from_slice("test-service-resource".as_ref()).unwrap(),
382            trace_id: 111,
383            span_id: 333,
384            parent_id: 100,
385            start: 1,
386            duration: 5,
387            error: 1,
388            meta: HashMap::new(),
389            metrics: HashMap::new(),
390            meta_struct: HashMap::new(),
391            r#type: BytesString::default(),
392            span_links: vec![],
393            span_events: vec![],
394        }];
395
396        let data = rmp_serde::to_vec(&vec![span_data1, span_data2])
397            .expect("Failed to serialize test span.");
398        let data = libdd_tinybytes::Bytes::from(data);
399
400        let result = decode_to_trace_chunks(data, TraceEncoding::V04);
401
402        assert!(result.is_ok());
403
404        let (chunks, _) = result.unwrap();
405        assert_eq!(2, chunks.size());
406
407        if let TraceChunks::V04(traces) = chunks {
408            assert_eq!(expected_serialized_span_data1, traces[0]);
409            assert_eq!(expected_serialized_span_data2, traces[1]);
410        } else {
411            panic!("Invalid collection type returned for try_into");
412        }
413    }
414
415    #[cfg_attr(miri, ignore)]
416    #[test]
417    fn test_try_into_empty() {
418        let empty_data = vec![0x90];
419        let data = libdd_tinybytes::Bytes::from(empty_data);
420
421        let result = decode_to_trace_chunks(data, TraceEncoding::V04);
422
423        assert!(result.is_ok());
424
425        let (collection, _) = result.unwrap();
426        assert_eq!(0, collection.size());
427    }
428
429    #[test]
430    fn test_try_into_meta_metrics_success() {
431        let dummy_trace = create_trace();
432        let expected = vec![dummy_trace.clone()];
433        let payload = rmp_serde::to_vec_named(&expected).unwrap();
434        let payload = libdd_tinybytes::Bytes::from(payload);
435
436        let result = decode_to_trace_chunks(payload, TraceEncoding::V04);
437
438        assert!(result.is_ok());
439
440        let (collection, _size) = result.unwrap();
441        assert_eq!(1, collection.size());
442        if let TraceChunks::V04(traces) = collection {
443            assert_eq!(dummy_trace, traces[0]);
444        } else {
445            panic!("Invalid collection type returned for try_into");
446        }
447    }
448}