Skip to main content

libdd_trace_utils/msgpack_decoder/v05/
mod.rs

1// Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/
2// SPDX-License-Identifier: Apache-2.0
3
4use crate::msgpack_decoder::decode::error::DecodeError;
5use crate::msgpack_decoder::decode::{
6    buffer::Buffer, map::read_map_len, number::read_number, string::handle_null_marker,
7};
8use crate::span::v04::{Span, SpanBytes, SpanSlice};
9use crate::span::DeserializableTraceData;
10use std::collections::HashMap;
11
12const PAYLOAD_LEN: u32 = 2;
13const SPAN_ELEM_COUNT: u32 = 12;
14
15/// Decodes a Bytes buffer into a `Vec<Vec<SpanBytes>>` object, also represented as a vector of
16/// `TracerPayloadV05` objects.
17///
18/// # Arguments
19///
20/// * `data` - A tinybytes Bytes buffer containing the encoded data. Bytes are expected to be
21///   encoded msgpack data containing a list of a list of v05 spans.
22///
23/// # Returns
24///
25/// * `Ok(Vec<TracerPayloadV05>)` - A vector of decoded `TracerPayloadV05` objects if successful.
26/// * `Err(DecodeError)` - An error if the decoding process fails.
27///
28/// # Errors
29///
30/// This function will return an error if:
31/// - The array length for trace count or span count cannot be read.
32/// - Any span cannot be decoded.
33///
34/// # Examples
35///
36/// ```
37/// use libdd_tinybytes;
38/// use libdd_trace_utils::msgpack_decoder::v05::from_bytes;
39/// use rmp_serde::to_vec;
40/// use std::collections::HashMap;
41///
42/// let data = (
43///     vec!["".to_string()],
44///     vec![vec![(
45///         0,
46///         0,
47///         0,
48///         1,
49///         2,
50///         3,
51///         4,
52///         5,
53///         6,
54///         HashMap::<u32, u32>::new(),
55///         HashMap::<u32, f64>::new(),
56///         0,
57///     )]],
58/// );
59/// let encoded_data = to_vec(&data).unwrap();
60/// let encoded_data_as_tinybytes = libdd_tinybytes::Bytes::from(encoded_data);
61/// let (decoded_traces, _payload_size) =
62///     from_bytes(encoded_data_as_tinybytes).expect("Decoding failed");
63///
64/// assert_eq!(1, decoded_traces.len());
65/// assert_eq!(1, decoded_traces[0].len());
66/// let decoded_span = &decoded_traces[0][0];
67/// assert_eq!("", decoded_span.name.as_str());
68/// ```
69pub fn from_bytes(
70    data: libdd_tinybytes::Bytes,
71) -> Result<(Vec<Vec<SpanBytes>>, usize), DecodeError> {
72    from_buffer(&mut Buffer::new(data))
73}
74
75/// Decodes a slice of bytes into a `Vec<Vec<SpanSlice>>` object.
76/// The resulting spans have the same lifetime as the initial buffer.
77///
78/// # Arguments
79///
80/// * `data` - A slice of bytes containing the encoded data. Bytes are expected to be encoded
81///   msgpack data containing a list of a list of v05 spans.
82///
83/// # Returns
84///
85/// * `Ok(Vec<Vec<SpanSlice>>)` - A vector of decoded `Vec<SpanSlice>` objects if successful.
86/// * `Err(DecodeError)` - An error if the decoding process fails.
87///
88/// # Errors
89///
90/// This function will return an error if:
91/// - The array length for trace count or span count cannot be read.
92/// - Any span cannot be decoded.
93///
94/// # Examples
95///
96/// ```
97/// use libdd_tinybytes;
98/// use libdd_trace_utils::msgpack_decoder::v05::from_slice;
99/// use rmp_serde::to_vec;
100/// use std::collections::HashMap;
101///
102/// let data = (
103///     vec!["".to_string()],
104///     vec![vec![(
105///         0,
106///         0,
107///         0,
108///         1,
109///         2,
110///         3,
111///         4,
112///         5,
113///         6,
114///         HashMap::<u32, u32>::new(),
115///         HashMap::<u32, f64>::new(),
116///         0,
117///     )]],
118/// );
119/// let encoded_data = to_vec(&data).unwrap();
120/// let encoded_data_as_tinybytes = libdd_tinybytes::Bytes::from(encoded_data);
121/// let (decoded_traces, _payload_size) =
122///     from_slice(&encoded_data_as_tinybytes).expect("Decoding failed");
123///
124/// assert_eq!(1, decoded_traces.len());
125/// assert_eq!(1, decoded_traces[0].len());
126/// let decoded_span = &decoded_traces[0][0];
127/// assert_eq!("", decoded_span.name);
128/// ```
129pub fn from_slice(data: &[u8]) -> Result<(Vec<Vec<SpanSlice<'_>>>, usize), DecodeError> {
130    from_buffer(&mut Buffer::new(data))
131}
132
133#[allow(clippy::type_complexity)]
134fn from_buffer<T: DeserializableTraceData>(
135    data: &mut Buffer<T>,
136) -> Result<(Vec<Vec<Span<T>>>, usize), DecodeError>
137where
138    T::Text: Clone,
139{
140    let data_elem = rmp::decode::read_array_len(data.as_mut_slice())
141        .map_err(|_| DecodeError::InvalidFormat("Unable to read payload len".to_string()))?;
142
143    if data_elem != PAYLOAD_LEN {
144        return Err(DecodeError::InvalidFormat(
145            "Invalid payload size".to_string(),
146        ));
147    }
148
149    let dict = deserialize_dict(data)?;
150
151    let trace_count = rmp::decode::read_array_len(data.as_mut_slice())
152        .map_err(|_| DecodeError::InvalidFormat("Unable to read trace len".to_string()))?;
153
154    let mut traces: Vec<Vec<Span<T>>> = Vec::with_capacity(trace_count as usize);
155    let start_len = data.len();
156
157    for _ in 0..trace_count {
158        let span_count = rmp::decode::read_array_len(data.as_mut_slice())
159            .map_err(|_| DecodeError::InvalidFormat("Unable to read span len".to_string()))?;
160        let mut trace: Vec<Span<T>> = Vec::with_capacity(span_count as usize);
161
162        for _ in 0..span_count {
163            let span = deserialize_span(data, &dict)?;
164            trace.push(span);
165        }
166        traces.push(trace);
167    }
168    Ok((traces, start_len - data.len()))
169}
170
171fn deserialize_dict<T: DeserializableTraceData>(
172    data: &mut Buffer<T>,
173) -> Result<Vec<T::Text>, DecodeError> {
174    let dict_len = rmp::decode::read_array_len(data.as_mut_slice())
175        .map_err(|_| DecodeError::InvalidFormat("Unable to read dictionary len".to_string()))?;
176
177    let mut dict: Vec<T::Text> = Vec::with_capacity(dict_len as usize);
178    for _ in 0..dict_len {
179        let str = data.read_string()?;
180        dict.push(str);
181    }
182    Ok(dict)
183}
184
185fn deserialize_span<T: DeserializableTraceData>(
186    data: &mut Buffer<T>,
187    dict: &[T::Text],
188) -> Result<Span<T>, DecodeError>
189where
190    T::Text: Clone,
191{
192    let mut span = Span::default();
193    let span_len = rmp::decode::read_array_len(data.as_mut_slice())
194        .map_err(|_| DecodeError::InvalidFormat("Unable to read dictionary len".to_string()))?;
195
196    if span_len != SPAN_ELEM_COUNT {
197        return Err(DecodeError::InvalidFormat(
198            "Invalid number of span fields".to_string(),
199        ));
200    }
201
202    span.service = get_from_dict(data, dict)?;
203    span.name = get_from_dict(data, dict)?;
204    span.resource = get_from_dict(data, dict)?;
205    span.trace_id = read_number::<_, u64>(data)? as u128;
206    span.span_id = read_number(data)?;
207    span.parent_id = read_number(data)?;
208    span.start = read_number(data)?;
209    span.duration = read_number(data)?;
210    span.error = read_number(data)?;
211    span.meta = read_indexed_map_to_bytes_strings(data, dict)?;
212    span.metrics = read_metrics(data, dict)?;
213    span.r#type = get_from_dict(data, dict)?;
214
215    Ok(span)
216}
217
218fn get_from_dict<T: DeserializableTraceData>(
219    data: &mut Buffer<T>,
220    dict: &[T::Text],
221) -> Result<T::Text, DecodeError>
222where
223    T::Text: Clone,
224{
225    let index: u32 = read_number(data)?;
226    match dict.get(index as usize) {
227        Some(value) => Ok(value.clone()),
228        None => Err(DecodeError::InvalidFormat(
229            "Unable to locate string in the dictionary".to_string(),
230        )),
231    }
232}
233
234fn read_indexed_map_to_bytes_strings<T: DeserializableTraceData>(
235    buf: &mut Buffer<T>,
236    dict: &[T::Text],
237) -> Result<HashMap<T::Text, T::Text>, DecodeError>
238where
239    T::Text: Clone,
240{
241    let len = rmp::decode::read_map_len(buf.as_mut_slice())
242        .map_err(|_| DecodeError::InvalidFormat("Unable to get map len for str map".to_owned()))?;
243
244    #[allow(clippy::expect_used)]
245    let mut map = HashMap::with_capacity(len.try_into().expect("Unable to cast map len to usize"));
246    for _ in 0..len {
247        let key = get_from_dict(buf, dict)?;
248        let value = get_from_dict(buf, dict)?;
249        map.insert(key, value);
250    }
251    Ok(map)
252}
253
254fn read_metrics<T: DeserializableTraceData>(
255    buf: &mut Buffer<T>,
256    dict: &[T::Text],
257) -> Result<HashMap<T::Text, f64>, DecodeError>
258where
259    T::Text: Clone,
260{
261    if handle_null_marker(buf) {
262        return Ok(HashMap::default());
263    }
264
265    let len = read_map_len(buf)?;
266
267    let mut map = HashMap::with_capacity(len);
268    for _ in 0..len {
269        let k = get_from_dict(buf, dict)?;
270        let v = read_number(buf)?;
271        map.insert(k, v);
272    }
273    Ok(map)
274}
275
276#[cfg(test)]
277mod tests {
278    use super::*;
279    use crate::span::SliceData;
280    use std::collections::HashMap;
281
282    type V05Span = (
283        u8,
284        u8,
285        u8,
286        u64,
287        u64,
288        u64,
289        i64,
290        i64,
291        i32,
292        HashMap<u8, u8>,
293        HashMap<u8, f64>,
294        u8,
295    );
296
297    type V05SpanMalformed = (
298        u8,
299        u8,
300        u8,
301        u64,
302        u64,
303        u64,
304        i64,
305        i64,
306        i32,
307        HashMap<u8, u8>,
308        HashMap<u8, f64>,
309    );
310
311    type V05Payload = (Vec<String>, Vec<Vec<V05Span>>);
312    type V05PayloadMalformed = (Vec<String>, Vec<Vec<V05SpanMalformed>>);
313
314    #[test]
315    fn deserialize_dict_test() {
316        let dict = vec!["foo", "bar", "baz"];
317        let mpack = rmp_serde::to_vec(&dict).unwrap();
318        let mut payload = Buffer::<SliceData>::new(mpack.as_ref());
319
320        let result = deserialize_dict(&mut payload).unwrap();
321        assert_eq!(dict, result);
322    }
323
324    #[test]
325    fn from_bytes_invalid_size_test() {
326        // 3 empty array.
327        let empty_three: [u8; 3] = [0x93, 0x90, 0x90];
328        let payload = unsafe { std::mem::transmute::<&'_ [u8], &'static [u8]>(&empty_three) };
329        let bytes = libdd_tinybytes::Bytes::from_static(payload);
330        let result = from_bytes(bytes);
331
332        assert!(result.is_err());
333        matches!(result.err().unwrap(), DecodeError::InvalidFormat(_));
334
335        // 1 empty array
336        let empty_one: [u8; 2] = [0x91, 0x90];
337        let payload = unsafe { std::mem::transmute::<&'_ [u8], &'static [u8]>(&empty_one) };
338        let bytes = libdd_tinybytes::Bytes::from_static(payload);
339        let result = from_bytes(bytes);
340
341        assert!(result.is_err());
342        matches!(result.err().unwrap(), DecodeError::InvalidFormat(_));
343    }
344
345    #[test]
346    fn from_bytes_test() {
347        let data: V05Payload = (
348            vec![
349                "".to_string(),
350                "item".to_string(),
351                "version".to_string(),
352                "7.0".to_string(),
353                "my-name".to_string(),
354                "X".to_string(),
355                "my-service".to_string(),
356                "my-resource".to_string(),
357                "_dd.sampling_rate_whatever".to_string(),
358                "value whatever".to_string(),
359                "sql".to_string(),
360            ],
361            vec![vec![(
362                6,
363                4,
364                7,
365                1,
366                2,
367                3,
368                123,
369                456,
370                1,
371                HashMap::from([(8, 9), (0, 1), (2, 3)]),
372                HashMap::from([(5, 1.2)]),
373                10,
374            )]],
375        );
376        let msgpack = rmp_serde::to_vec(&data).unwrap();
377        let (traces, _) = from_bytes(libdd_tinybytes::Bytes::from(msgpack)).unwrap();
378
379        let span = &traces[0][0];
380        assert_eq!(span.service.as_str(), "my-service");
381        assert_eq!(span.name.as_str(), "my-name");
382        assert_eq!(span.resource.as_str(), "my-resource");
383        assert_eq!(span.trace_id, 1);
384        assert_eq!(span.span_id, 2);
385        assert_eq!(span.parent_id, 3);
386        assert_eq!(span.start, 123);
387        assert_eq!(span.duration, 456);
388        assert_eq!(span.error, 1);
389        assert_eq!(span.meta.len(), 3);
390        assert_eq!(
391            span.meta
392                .get("_dd.sampling_rate_whatever")
393                .unwrap()
394                .as_str(),
395            "value whatever"
396        );
397        assert_eq!(span.meta.get("").unwrap().as_str(), "item");
398        assert_eq!(span.meta.get("version").unwrap().as_str(), "7.0");
399        assert_eq!(span.metrics.len(), 1);
400        assert_eq!(*span.metrics.get("X").unwrap(), 1.2_f64);
401        assert_eq!(span.r#type.as_str(), "sql");
402    }
403
404    #[test]
405    fn missing_dict_elements_test() {
406        let data: V05Payload = (
407            vec![
408                "".to_string(),
409                "item".to_string(),
410                "version".to_string(),
411                "7.0".to_string(),
412                "my-name".to_string(),
413                "X".to_string(),
414                "my-service".to_string(),
415                "my-resource".to_string(),
416                "_dd.sampling_rate_whatever".to_string(),
417                "value whatever".to_string(),
418            ],
419            vec![vec![(
420                6,
421                4,
422                7,
423                1,
424                2,
425                3,
426                123,
427                456,
428                1,
429                HashMap::from([(8, 9), (0, 1), (2, 3)]),
430                HashMap::from([(5, 1.2)]),
431                10,
432            )]],
433        );
434        let payload = rmp_serde::to_vec(&data).unwrap();
435        let payload = unsafe { std::mem::transmute::<&'_ [u8], &'static [u8]>(&payload) };
436        let result = from_bytes(libdd_tinybytes::Bytes::from_static(payload));
437
438        assert!(result.is_err());
439
440        // Unable to locate string in the dictionary
441        matches!(result.err().unwrap(), DecodeError::InvalidFormat(_));
442    }
443
444    #[test]
445    fn missing_span_elements_test() {
446        let data: V05PayloadMalformed = (
447            vec![
448                "".to_string(),
449                "item".to_string(),
450                "version".to_string(),
451                "7.0".to_string(),
452                "my-name".to_string(),
453                "X".to_string(),
454                "my-service".to_string(),
455                "my-resource".to_string(),
456                "_dd.sampling_rate_whatever".to_string(),
457                "value whatever".to_string(),
458            ],
459            vec![vec![(
460                6,
461                4,
462                7,
463                1,
464                2,
465                3,
466                123,
467                456,
468                1,
469                HashMap::from([(8, 9), (0, 1), (2, 3)]),
470                HashMap::from([(5, 1.2)]),
471            )]],
472        );
473
474        let payload = rmp_serde::to_vec(&data).unwrap();
475        let payload = unsafe { std::mem::transmute::<&'_ [u8], &'static [u8]>(&payload) };
476        let result = from_bytes(libdd_tinybytes::Bytes::from_static(payload));
477
478        assert!(result.is_err());
479
480        // Invalid number of span fields.
481        matches!(result.err().unwrap(), DecodeError::InvalidFormat(_));
482    }
483}