libdd_trace_utils/msgpack_decoder/v05/
mod.rs

1// Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/
2// SPDX-License-Identifier: Apache-2.0
3
4use crate::msgpack_decoder::decode::error::DecodeError;
5use crate::msgpack_decoder::decode::{
6    map::read_map_len,
7    number::read_number_slice,
8    string::{handle_null_marker, read_string_ref},
9};
10use crate::span::{SpanBytes, SpanSlice};
11use std::collections::HashMap;
12
13const PAYLOAD_LEN: u32 = 2;
14const SPAN_ELEM_COUNT: u32 = 12;
15
16/// Decodes a Bytes buffer into a `Vec<Vec<SpanBytes>>` object, also represented as a vector of
17/// `TracerPayloadV05` objects.
18///
19/// # Arguments
20///
21/// * `data` - A tinybytes Bytes buffer containing the encoded data. Bytes are expected to be
22///   encoded msgpack data containing a list of a list of v05 spans.
23///
24/// # Returns
25///
26/// * `Ok(Vec<TracerPayloadV05>)` - A vector of decoded `TracerPayloadV05` objects if successful.
27/// * `Err(DecodeError)` - An error if the decoding process fails.
28///
29/// # Errors
30///
31/// This function will return an error if:
32/// - The array length for trace count or span count cannot be read.
33/// - Any span cannot be decoded.
34///
35/// # Examples
36///
37/// ```
38/// use libdd_tinybytes;
39/// use libdd_trace_utils::msgpack_decoder::v05::from_bytes;
40/// use rmp_serde::to_vec;
41/// use std::collections::HashMap;
42///
43/// let data = (
44///     vec!["".to_string()],
45///     vec![vec![(
46///         0,
47///         0,
48///         0,
49///         1,
50///         2,
51///         3,
52///         4,
53///         5,
54///         6,
55///         HashMap::<u32, u32>::new(),
56///         HashMap::<u32, f64>::new(),
57///         0,
58///     )]],
59/// );
60/// let encoded_data = to_vec(&data).unwrap();
61/// let encoded_data_as_tinybytes = libdd_tinybytes::Bytes::from(encoded_data);
62/// let (decoded_traces, _payload_size) =
63///     from_bytes(encoded_data_as_tinybytes).expect("Decoding failed");
64///
65/// assert_eq!(1, decoded_traces.len());
66/// assert_eq!(1, decoded_traces[0].len());
67/// let decoded_span = &decoded_traces[0][0];
68/// assert_eq!("", decoded_span.name.as_str());
69/// ```
70pub fn from_bytes(
71    data: libdd_tinybytes::Bytes,
72) -> Result<(Vec<Vec<SpanBytes>>, usize), DecodeError> {
73    let (traces_ref, size) = from_slice(data.as_ref())?;
74
75    #[allow(clippy::unwrap_used)]
76    let traces_owned = traces_ref
77        .iter()
78        .map(|trace| {
79            trace
80                .iter()
81                // Safe to unwrap since the spans use subslices of the `data` slice
82                .map(|span| span.try_to_bytes(&data).unwrap())
83                .collect()
84        })
85        .collect();
86    Ok((traces_owned, size))
87}
88
89/// Decodes a slice of bytes into a `Vec<Vec<SpanSlice>>` object.
90/// The resulting spans have the same lifetime as the initial buffer.
91///
92/// # Arguments
93///
94/// * `data` - A slice of bytes containing the encoded data. Bytes are expected to be encoded
95///   msgpack data containing a list of a list of v05 spans.
96///
97/// # Returns
98///
99/// * `Ok(Vec<Vec<SpanSlice>>)` - A vector of decoded `Vec<SpanSlice>` objects if successful.
100/// * `Err(DecodeError)` - An error if the decoding process fails.
101///
102/// # Errors
103///
104/// This function will return an error if:
105/// - The array length for trace count or span count cannot be read.
106/// - Any span cannot be decoded.
107///
108/// # Examples
109///
110/// ```
111/// use libdd_tinybytes;
112/// use libdd_trace_utils::msgpack_decoder::v05::from_slice;
113/// use rmp_serde::to_vec;
114/// use std::collections::HashMap;
115///
116/// let data = (
117///     vec!["".to_string()],
118///     vec![vec![(
119///         0,
120///         0,
121///         0,
122///         1,
123///         2,
124///         3,
125///         4,
126///         5,
127///         6,
128///         HashMap::<u32, u32>::new(),
129///         HashMap::<u32, f64>::new(),
130///         0,
131///     )]],
132/// );
133/// let encoded_data = to_vec(&data).unwrap();
134/// let encoded_data_as_tinybytes = libdd_tinybytes::Bytes::from(encoded_data);
135/// let (decoded_traces, _payload_size) =
136///     from_slice(&encoded_data_as_tinybytes).expect("Decoding failed");
137///
138/// assert_eq!(1, decoded_traces.len());
139/// assert_eq!(1, decoded_traces[0].len());
140/// let decoded_span = &decoded_traces[0][0];
141/// assert_eq!("", decoded_span.name);
142/// ```
143pub fn from_slice(mut data: &[u8]) -> Result<(Vec<Vec<SpanSlice<'_>>>, usize), DecodeError> {
144    let data_elem = rmp::decode::read_array_len(&mut data)
145        .map_err(|_| DecodeError::InvalidFormat("Unable to read payload len".to_string()))?;
146
147    if data_elem != PAYLOAD_LEN {
148        return Err(DecodeError::InvalidFormat(
149            "Invalid payload size".to_string(),
150        ));
151    }
152
153    let dict = deserialize_dict(&mut data)?;
154
155    let trace_count = rmp::decode::read_array_len(&mut data)
156        .map_err(|_| DecodeError::InvalidFormat("Unable to read trace len".to_string()))?;
157
158    let mut traces: Vec<Vec<SpanSlice>> = Vec::with_capacity(trace_count as usize);
159    let start_len = data.len();
160
161    for _ in 0..trace_count {
162        let span_count = rmp::decode::read_array_len(&mut data)
163            .map_err(|_| DecodeError::InvalidFormat("Unable to read span len".to_string()))?;
164        let mut trace: Vec<SpanSlice> = Vec::with_capacity(span_count as usize);
165
166        for _ in 0..span_count {
167            let span = deserialize_span(&mut data, &dict)?;
168            trace.push(span);
169        }
170        traces.push(trace);
171    }
172    Ok((traces, start_len - data.len()))
173}
174
175fn deserialize_dict<'a>(data: &mut &'a [u8]) -> Result<Vec<&'a str>, DecodeError> {
176    let dict_len = rmp::decode::read_array_len(data)
177        .map_err(|_| DecodeError::InvalidFormat("Unable to read dictionary len".to_string()))?;
178
179    let mut dict: Vec<&'a str> = Vec::with_capacity(dict_len as usize);
180    for _ in 0..dict_len {
181        let str = read_string_ref(data)?;
182        dict.push(str);
183    }
184    Ok(dict)
185}
186
187fn deserialize_span<'a>(data: &mut &[u8], dict: &[&'a str]) -> Result<SpanSlice<'a>, DecodeError> {
188    let mut span = SpanSlice::default();
189    let span_len = rmp::decode::read_array_len(data)
190        .map_err(|_| DecodeError::InvalidFormat("Unable to read dictionary len".to_string()))?;
191
192    if span_len != SPAN_ELEM_COUNT {
193        return Err(DecodeError::InvalidFormat(
194            "Invalid number of span fields".to_string(),
195        ));
196    }
197
198    span.service = get_from_dict(data, dict)?;
199    span.name = get_from_dict(data, dict)?;
200    span.resource = get_from_dict(data, dict)?;
201    span.trace_id = read_number_slice::<u64>(data)? as u128;
202    span.span_id = read_number_slice(data)?;
203    span.parent_id = read_number_slice(data)?;
204    span.start = read_number_slice(data)?;
205    span.duration = read_number_slice(data)?;
206    span.error = read_number_slice(data)?;
207    span.meta = read_indexed_map_to_bytes_strings(data, dict)?;
208    span.metrics = read_metrics(data, dict)?;
209    span.r#type = get_from_dict(data, dict)?;
210
211    Ok(span)
212}
213
214fn get_from_dict<'a>(data: &mut &[u8], dict: &[&'a str]) -> Result<&'a str, DecodeError> {
215    let index: u32 = read_number_slice(data)?;
216    match dict.get(index as usize) {
217        Some(value) => Ok(value),
218        None => Err(DecodeError::InvalidFormat(
219            "Unable to locate string in the dictionary".to_string(),
220        )),
221    }
222}
223
224fn read_indexed_map_to_bytes_strings<'a>(
225    buf: &mut &[u8],
226    dict: &[&'a str],
227) -> Result<HashMap<&'a str, &'a str>, DecodeError> {
228    let len = rmp::decode::read_map_len(buf)
229        .map_err(|_| DecodeError::InvalidFormat("Unable to get map len for str map".to_owned()))?;
230
231    #[allow(clippy::expect_used)]
232    let mut map = HashMap::with_capacity(len.try_into().expect("Unable to cast map len to usize"));
233    for _ in 0..len {
234        let key = get_from_dict(buf, dict)?;
235        let value = get_from_dict(buf, dict)?;
236        map.insert(key, value);
237    }
238    Ok(map)
239}
240
241fn read_metrics<'a>(
242    buf: &mut &[u8],
243    dict: &[&'a str],
244) -> Result<HashMap<&'a str, f64>, DecodeError> {
245    if handle_null_marker(buf) {
246        return Ok(HashMap::default());
247    }
248
249    let len = read_map_len(buf)?;
250
251    let mut map = HashMap::with_capacity(len);
252    for _ in 0..len {
253        let k = get_from_dict(buf, dict)?;
254        let v = read_number_slice(buf)?;
255        map.insert(k, v);
256    }
257    Ok(map)
258}
259
260#[cfg(test)]
261mod tests {
262    use super::*;
263    use std::collections::HashMap;
264    type V05Span = (
265        u8,
266        u8,
267        u8,
268        u64,
269        u64,
270        u64,
271        i64,
272        i64,
273        i32,
274        HashMap<u8, u8>,
275        HashMap<u8, f64>,
276        u8,
277    );
278
279    type V05SpanMalformed = (
280        u8,
281        u8,
282        u8,
283        u64,
284        u64,
285        u64,
286        i64,
287        i64,
288        i32,
289        HashMap<u8, u8>,
290        HashMap<u8, f64>,
291    );
292
293    type V05Payload = (Vec<String>, Vec<Vec<V05Span>>);
294    type V05PayloadMalformed = (Vec<String>, Vec<Vec<V05SpanMalformed>>);
295
296    #[test]
297    fn deserialize_dict_test() {
298        let dict = vec!["foo", "bar", "baz"];
299        let mpack = rmp_serde::to_vec(&dict).unwrap();
300        let mut payload = mpack.as_ref();
301
302        let result = deserialize_dict(&mut payload).unwrap();
303        assert_eq!(dict, result);
304    }
305
306    #[test]
307    fn from_bytes_invalid_size_test() {
308        // 3 empty array.
309        let empty_three: [u8; 3] = [0x93, 0x90, 0x90];
310        let payload = unsafe { std::mem::transmute::<&'_ [u8], &'static [u8]>(&empty_three) };
311        let bytes = libdd_tinybytes::Bytes::from_static(payload);
312        let result = from_bytes(bytes);
313
314        assert!(result.is_err());
315        matches!(result.err().unwrap(), DecodeError::InvalidFormat(_));
316
317        // 1 empty array
318        let empty_one: [u8; 2] = [0x91, 0x90];
319        let payload = unsafe { std::mem::transmute::<&'_ [u8], &'static [u8]>(&empty_one) };
320        let bytes = libdd_tinybytes::Bytes::from_static(payload);
321        let result = from_bytes(bytes);
322
323        assert!(result.is_err());
324        matches!(result.err().unwrap(), DecodeError::InvalidFormat(_));
325    }
326
327    #[test]
328    fn from_bytes_test() {
329        let data: V05Payload = (
330            vec![
331                "".to_string(),
332                "item".to_string(),
333                "version".to_string(),
334                "7.0".to_string(),
335                "my-name".to_string(),
336                "X".to_string(),
337                "my-service".to_string(),
338                "my-resource".to_string(),
339                "_dd.sampling_rate_whatever".to_string(),
340                "value whatever".to_string(),
341                "sql".to_string(),
342            ],
343            vec![vec![(
344                6,
345                4,
346                7,
347                1,
348                2,
349                3,
350                123,
351                456,
352                1,
353                HashMap::from([(8, 9), (0, 1), (2, 3)]),
354                HashMap::from([(5, 1.2)]),
355                10,
356            )]],
357        );
358        let msgpack = rmp_serde::to_vec(&data).unwrap();
359        let (traces, _) = from_bytes(libdd_tinybytes::Bytes::from(msgpack)).unwrap();
360
361        let span = &traces[0][0];
362        assert_eq!(span.service.as_str(), "my-service");
363        assert_eq!(span.name.as_str(), "my-name");
364        assert_eq!(span.resource.as_str(), "my-resource");
365        assert_eq!(span.trace_id, 1);
366        assert_eq!(span.span_id, 2);
367        assert_eq!(span.parent_id, 3);
368        assert_eq!(span.start, 123);
369        assert_eq!(span.duration, 456);
370        assert_eq!(span.error, 1);
371        assert_eq!(span.meta.len(), 3);
372        assert_eq!(
373            span.meta
374                .get("_dd.sampling_rate_whatever")
375                .unwrap()
376                .as_str(),
377            "value whatever"
378        );
379        assert_eq!(span.meta.get("").unwrap().as_str(), "item");
380        assert_eq!(span.meta.get("version").unwrap().as_str(), "7.0");
381        assert_eq!(span.metrics.len(), 1);
382        assert_eq!(*span.metrics.get("X").unwrap(), 1.2_f64);
383        assert_eq!(span.r#type.as_str(), "sql");
384    }
385
386    #[test]
387    fn missing_dict_elements_test() {
388        let data: V05Payload = (
389            vec![
390                "".to_string(),
391                "item".to_string(),
392                "version".to_string(),
393                "7.0".to_string(),
394                "my-name".to_string(),
395                "X".to_string(),
396                "my-service".to_string(),
397                "my-resource".to_string(),
398                "_dd.sampling_rate_whatever".to_string(),
399                "value whatever".to_string(),
400            ],
401            vec![vec![(
402                6,
403                4,
404                7,
405                1,
406                2,
407                3,
408                123,
409                456,
410                1,
411                HashMap::from([(8, 9), (0, 1), (2, 3)]),
412                HashMap::from([(5, 1.2)]),
413                10,
414            )]],
415        );
416        let payload = rmp_serde::to_vec(&data).unwrap();
417        let payload = unsafe { std::mem::transmute::<&'_ [u8], &'static [u8]>(&payload) };
418        let result = from_bytes(libdd_tinybytes::Bytes::from_static(payload));
419
420        assert!(result.is_err());
421
422        // Unable to locate string in the dictionary
423        matches!(result.err().unwrap(), DecodeError::InvalidFormat(_));
424    }
425
426    #[test]
427    fn missing_span_elements_test() {
428        let data: V05PayloadMalformed = (
429            vec![
430                "".to_string(),
431                "item".to_string(),
432                "version".to_string(),
433                "7.0".to_string(),
434                "my-name".to_string(),
435                "X".to_string(),
436                "my-service".to_string(),
437                "my-resource".to_string(),
438                "_dd.sampling_rate_whatever".to_string(),
439                "value whatever".to_string(),
440            ],
441            vec![vec![(
442                6,
443                4,
444                7,
445                1,
446                2,
447                3,
448                123,
449                456,
450                1,
451                HashMap::from([(8, 9), (0, 1), (2, 3)]),
452                HashMap::from([(5, 1.2)]),
453            )]],
454        );
455
456        let payload = rmp_serde::to_vec(&data).unwrap();
457        let payload = unsafe { std::mem::transmute::<&'_ [u8], &'static [u8]>(&payload) };
458        let result = from_bytes(libdd_tinybytes::Bytes::from_static(payload));
459
460        assert!(result.is_err());
461
462        // Invalid number of span fields.
463        matches!(result.err().unwrap(), DecodeError::InvalidFormat(_));
464    }
465}