libdd_trace_utils/msgpack_decoder/v04/
mod.rs

1// Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/
2// SPDX-License-Identifier: Apache-2.0
3
4pub(crate) mod span;
5
6use self::span::decode_span;
7use crate::msgpack_decoder::decode::error::DecodeError;
8use crate::span::{SpanBytes, SpanSlice};
9
10/// Decodes a Bytes buffer into a `Vec<Vec<SpanBytes>>` object, also represented as a vector of
11/// `TracerPayloadV04` objects.
12///
13/// # Arguments
14///
15/// * `data` - A tinybytes Bytes buffer containing the encoded data. Bytes are expected to be
16///   encoded msgpack data containing a list of a list of v04 spans.
17///
18/// # Returns
19///
20/// * `Ok(Vec<TracerPayloadV04>, usize)` - A vector of decoded `Vec<SpanSlice>` objects if
21///   successful. and the number of bytes in the slice used by the decoder.
22/// * `Err(DecodeError)` - An error if the decoding process fails.
23///
24/// # Errors
25///
26/// This function will return an error if:
27/// - The array length for trace count or span count cannot be read.
28/// - Any span cannot be decoded.
29///
30/// # Examples
31///
32/// ```
33/// use libdd_tinybytes;
34/// use libdd_trace_protobuf::pb::Span;
35/// use libdd_trace_utils::msgpack_decoder::v04::from_bytes;
36/// use rmp_serde::to_vec_named;
37///
38/// let span = Span {
39///     name: "test-span".to_owned(),
40///     ..Default::default()
41/// };
42/// let encoded_data = to_vec_named(&vec![vec![span]]).unwrap();
43/// let encoded_data_as_tinybytes = libdd_tinybytes::Bytes::from(encoded_data);
44/// let (decoded_traces, _payload_size) =
45///     from_bytes(encoded_data_as_tinybytes).expect("Decoding failed");
46///
47/// assert_eq!(1, decoded_traces.len());
48/// assert_eq!(1, decoded_traces[0].len());
49/// let decoded_span = &decoded_traces[0][0];
50/// assert_eq!("test-span", decoded_span.name.as_str());
51/// ```
52pub fn from_bytes(
53    data: libdd_tinybytes::Bytes,
54) -> Result<(Vec<Vec<SpanBytes>>, usize), DecodeError> {
55    let (traces_ref, size) = from_slice(data.as_ref())?;
56
57    #[allow(clippy::unwrap_used)]
58    let traces_owned = traces_ref
59        .iter()
60        .map(|trace| {
61            trace
62                .iter()
63                // Safe to unwrap since the spans use subslices of the `data` slice
64                .map(|span| span.try_to_bytes(&data).unwrap())
65                .collect()
66        })
67        .collect();
68    Ok((traces_owned, size))
69}
70
71/// Decodes a slice of bytes into a `Vec<Vec<SpanSlice>>` object.
72/// The resulting spans have the same lifetime as the initial buffer.
73///
74/// # Arguments
75///
76/// * `data` - A slice of bytes containing the encoded data. Bytes are expected to be encoded
77///   msgpack data containing a list of a list of v04 spans.
78///
79/// # Returns
80///
81/// * `Ok(Vec<TracerPayloadV04>, usize)` - A vector of decoded `Vec<SpanSlice>` objects if
82///   successful. and the number of bytes in the slice used by the decoder.
83/// * `Err(DecodeError)` - An error if the decoding process fails.
84///
85/// # Errors
86///
87/// This function will return an error if:
88/// - The array length for trace count or span count cannot be read.
89/// - Any span cannot be decoded.
90///
91/// # Examples
92///
93/// ```
94/// use libdd_tinybytes;
95/// use libdd_trace_protobuf::pb::Span;
96/// use libdd_trace_utils::msgpack_decoder::v04::from_slice;
97/// use rmp_serde::to_vec_named;
98///
99/// let span = Span {
100///     name: "test-span".to_owned(),
101///     ..Default::default()
102/// };
103/// let encoded_data = to_vec_named(&vec![vec![span]]).unwrap();
104/// let encoded_data_as_tinybytes = libdd_tinybytes::Bytes::from(encoded_data);
105/// let (decoded_traces, _payload_size) =
106///     from_slice(&encoded_data_as_tinybytes).expect("Decoding failed");
107///
108/// assert_eq!(1, decoded_traces.len());
109/// assert_eq!(1, decoded_traces[0].len());
110/// let decoded_span = &decoded_traces[0][0];
111/// assert_eq!("test-span", decoded_span.name);
112/// ```
113pub fn from_slice(mut data: &[u8]) -> Result<(Vec<Vec<SpanSlice<'_>>>, usize), DecodeError> {
114    let trace_count = rmp::decode::read_array_len(&mut data).map_err(|_| {
115        DecodeError::InvalidFormat("Unable to read array len for trace count".to_owned())
116    })?;
117
118    let start_len = data.len();
119
120    #[allow(clippy::expect_used)]
121    Ok((
122        (0..trace_count).try_fold(
123            Vec::with_capacity(
124                trace_count
125                    .try_into()
126                    .expect("Unable to cast trace_count to usize"),
127            ),
128            |mut traces, _| {
129                let span_count = rmp::decode::read_array_len(&mut data).map_err(|_| {
130                    DecodeError::InvalidFormat("Unable to read array len for span count".to_owned())
131                })?;
132
133                let trace = (0..span_count).try_fold(
134                    Vec::with_capacity(
135                        span_count
136                            .try_into()
137                            .expect("Unable to cast span_count to usize"),
138                    ),
139                    |mut trace, _| {
140                        let span = decode_span(&mut data)?;
141                        trace.push(span);
142                        Ok(trace)
143                    },
144                )?;
145
146                traces.push(trace);
147
148                Ok(traces)
149            },
150        )?,
151        start_len - data.len(),
152    ))
153}
154
155#[cfg(test)]
156mod tests {
157    use super::*;
158    use crate::test_utils::{create_test_json_span, create_test_no_alloc_span};
159    use bolero::check;
160    use libdd_tinybytes::{Bytes, BytesString};
161    use rmp_serde;
162    use rmp_serde::to_vec_named;
163    use serde_json::json;
164    use std::collections::HashMap;
165
166    #[test]
167    fn test_empty_array() {
168        let encoded_data = vec![0x90];
169        let slice = encoded_data.as_ref();
170        let (_decoded_traces, decoded_size) = from_slice(slice).expect("Decoding failed");
171
172        assert_eq!(0, decoded_size);
173    }
174
175    #[test]
176    fn test_decoder_size() {
177        let span = SpanBytes {
178            name: BytesString::from_slice("span_name".as_ref()).unwrap(),
179            ..Default::default()
180        };
181        let mut encoded_data = rmp_serde::to_vec_named(&vec![vec![span]]).unwrap();
182        let expected_size = encoded_data.len() - 1; // rmp_serde adds additional 0 byte
183        encoded_data.extend_from_slice(&[0, 0, 0, 0]); // some garbage, to be ignored
184        let (_decoded_traces, decoded_size) =
185            from_bytes(libdd_tinybytes::Bytes::from(encoded_data)).expect("Decoding failed");
186
187        assert_eq!(expected_size, decoded_size);
188    }
189
190    #[test]
191    fn test_decoder_read_string_success() {
192        let expected_string = "test-service-name";
193        let span = SpanBytes {
194            name: BytesString::from_slice(expected_string.as_ref()).unwrap(),
195            ..Default::default()
196        };
197        let mut encoded_data = rmp_serde::to_vec_named(&vec![vec![span]]).unwrap();
198        encoded_data.extend_from_slice(&[0, 0, 0, 0]); // some garbage, to be ignored
199        let (decoded_traces, _) =
200            from_bytes(libdd_tinybytes::Bytes::from(encoded_data)).expect("Decoding failed");
201
202        assert_eq!(1, decoded_traces.len());
203        assert_eq!(1, decoded_traces[0].len());
204        let decoded_span = &decoded_traces[0][0];
205        assert_eq!(expected_string, decoded_span.name.as_str());
206    }
207
208    #[test]
209    fn test_decoder_read_null_string_success() {
210        let mut span = create_test_json_span(1, 2, 0, 0, false);
211        span["name"] = json!(null);
212        let mut encoded_data = rmp_serde::to_vec_named(&vec![vec![span]]).unwrap();
213        encoded_data.extend_from_slice(&[0, 0, 0, 0]); // some garbage, to be ignored
214        let (decoded_traces, _) =
215            from_bytes(libdd_tinybytes::Bytes::from(encoded_data)).expect("Decoding failed");
216
217        assert_eq!(1, decoded_traces.len());
218        assert_eq!(1, decoded_traces[0].len());
219        let decoded_span = &decoded_traces[0][0];
220        assert_eq!("", decoded_span.name.as_str());
221    }
222
223    #[test]
224    fn test_decoder_read_number_success() {
225        let span = create_test_json_span(1, 2, 0, 0, false);
226        let mut encoded_data = rmp_serde::to_vec_named(&vec![vec![span]]).unwrap();
227        encoded_data.extend_from_slice(&[0, 0, 0, 0]); // some garbage, to be ignored
228        let (decoded_traces, _) =
229            from_bytes(libdd_tinybytes::Bytes::from(encoded_data)).expect("Decoding failed");
230
231        assert_eq!(1, decoded_traces.len());
232        assert_eq!(1, decoded_traces[0].len());
233        let decoded_span = &decoded_traces[0][0];
234        assert_eq!(1, decoded_span.trace_id);
235    }
236
237    #[test]
238    fn test_decoder_read_null_number_success() {
239        let mut span = create_test_json_span(1, 2, 0, 0, false);
240        span["trace_id"] = json!(null);
241        let mut encoded_data = rmp_serde::to_vec_named(&vec![vec![span]]).unwrap();
242        encoded_data.extend_from_slice(&[0, 0, 0, 0]); // some garbage, to be ignored
243        let (decoded_traces, _) =
244            from_bytes(libdd_tinybytes::Bytes::from(encoded_data)).expect("Decoding failed");
245
246        assert_eq!(1, decoded_traces.len());
247        assert_eq!(1, decoded_traces[0].len());
248        let decoded_span = &decoded_traces[0][0];
249        assert_eq!(0, decoded_span.trace_id);
250    }
251
252    #[test]
253    fn test_decoder_meta_struct_null_map_success() {
254        let mut span = create_test_json_span(1, 2, 0, 0, false);
255        span["meta_struct"] = json!(null);
256
257        let encoded_data = rmp_serde::to_vec_named(&vec![vec![span]]).unwrap();
258        let (decoded_traces, _) =
259            from_bytes(libdd_tinybytes::Bytes::from(encoded_data)).expect("Decoding failed");
260
261        assert_eq!(1, decoded_traces.len());
262        assert_eq!(1, decoded_traces[0].len());
263        let decoded_span = &decoded_traces[0][0];
264
265        assert!(decoded_span.meta_struct.is_empty());
266    }
267
268    #[test]
269    fn test_decoder_meta_struct_success() {
270        let data = vec![1, 2, 3, 4];
271        let mut span = create_test_no_alloc_span(1, 2, 0, 0, false);
272        span.meta_struct =
273            HashMap::from([(BytesString::from("meta_key"), Bytes::from(data.clone()))]);
274
275        let encoded_data = rmp_serde::to_vec_named(&vec![vec![span]]).unwrap();
276        let (decoded_traces, _) =
277            from_bytes(libdd_tinybytes::Bytes::from(encoded_data)).expect("Decoding failed");
278
279        assert_eq!(1, decoded_traces.len());
280        assert_eq!(1, decoded_traces[0].len());
281        let decoded_span = &decoded_traces[0][0];
282
283        assert_eq!(
284            decoded_span.meta_struct.get("meta_key").unwrap().to_vec(),
285            data
286        );
287    }
288
289    #[test]
290    fn test_decoder_meta_fixed_map_success() {
291        let expected_meta = HashMap::from([
292            ("key1".to_string(), "value1".to_string()),
293            ("key2".to_string(), "value2".to_string()),
294        ]);
295
296        let mut span = create_test_json_span(1, 2, 0, 0, false);
297        span["meta"] = json!(expected_meta.clone());
298
299        let encoded_data = rmp_serde::to_vec_named(&vec![vec![span]]).unwrap();
300        let (decoded_traces, _) =
301            from_bytes(libdd_tinybytes::Bytes::from(encoded_data)).expect("Decoding failed");
302
303        assert_eq!(1, decoded_traces.len());
304        assert_eq!(1, decoded_traces[0].len());
305        let decoded_span = &decoded_traces[0][0];
306
307        for (key, value) in expected_meta.iter() {
308            assert_eq!(
309                value,
310                &decoded_span.meta[&BytesString::from_slice(key.as_ref()).unwrap()].as_str()
311            );
312        }
313    }
314
315    #[test]
316    fn test_decoder_meta_null_map_success() {
317        let mut span = create_test_json_span(1, 2, 0, 0, false);
318        span["meta"] = json!(null);
319
320        let encoded_data = rmp_serde::to_vec_named(&vec![vec![span]]).unwrap();
321        let (decoded_traces, _) =
322            from_bytes(libdd_tinybytes::Bytes::from(encoded_data)).expect("Decoding failed");
323
324        assert_eq!(1, decoded_traces.len());
325        assert_eq!(1, decoded_traces[0].len());
326        let decoded_span = &decoded_traces[0][0];
327
328        assert!(decoded_span.meta.is_empty());
329    }
330
331    #[test]
332    fn test_decoder_meta_map_16_success() {
333        let expected_meta: HashMap<String, String> = (0..20)
334            .map(|i| {
335                (
336                    format!("key {i}").to_owned(),
337                    format!("value {i}").to_owned(),
338                )
339            })
340            .collect();
341
342        let mut span = create_test_json_span(1, 2, 0, 0, false);
343        span["meta"] = json!(expected_meta.clone());
344
345        let encoded_data = rmp_serde::to_vec_named(&vec![vec![span]]).unwrap();
346        let (decoded_traces, _) =
347            from_bytes(libdd_tinybytes::Bytes::from(encoded_data)).expect("Decoding failed");
348
349        assert_eq!(1, decoded_traces.len());
350        assert_eq!(1, decoded_traces[0].len());
351        let decoded_span = &decoded_traces[0][0];
352
353        for (key, value) in expected_meta.iter() {
354            assert_eq!(
355                value,
356                &decoded_span.meta[&BytesString::from_slice(key.as_ref()).unwrap()].as_str()
357            );
358        }
359    }
360
361    #[test]
362    fn test_decoder_metrics_fixed_map_success() {
363        let expected_metrics = HashMap::from([("metric1", 1.23), ("metric2", 4.56)]);
364
365        let mut span = create_test_json_span(1, 2, 0, 0, false);
366        span["metrics"] = json!(expected_metrics.clone());
367        let encoded_data = rmp_serde::to_vec_named(&vec![vec![span]]).unwrap();
368        let (decoded_traces, _) =
369            from_bytes(libdd_tinybytes::Bytes::from(encoded_data)).expect("Decoding failed");
370
371        assert_eq!(1, decoded_traces.len());
372        assert_eq!(1, decoded_traces[0].len());
373        let decoded_span = &decoded_traces[0][0];
374
375        for (key, value) in expected_metrics.iter() {
376            assert_eq!(
377                value,
378                &decoded_span.metrics[&BytesString::from_slice(key.as_ref()).unwrap()]
379            );
380        }
381    }
382
383    #[test]
384    fn test_decoder_metrics_map16_success() {
385        let expected_metrics: HashMap<String, f64> =
386            (0..20).map(|i| (format!("metric{i}"), i as f64)).collect();
387
388        let mut span = create_test_json_span(1, 2, 0, 0, false);
389        span["metrics"] = json!(expected_metrics.clone());
390        let encoded_data = rmp_serde::to_vec_named(&vec![vec![span]]).unwrap();
391        let (decoded_traces, _) =
392            from_bytes(libdd_tinybytes::Bytes::from(encoded_data)).expect("Decoding failed");
393
394        assert_eq!(1, decoded_traces.len());
395        assert_eq!(1, decoded_traces[0].len());
396        let decoded_span = &decoded_traces[0][0];
397
398        for (key, value) in expected_metrics.iter() {
399            assert_eq!(
400                value,
401                &decoded_span.metrics[&BytesString::from_slice(key.as_ref()).unwrap()]
402            );
403        }
404    }
405
406    #[test]
407    fn test_decoder_metrics_null_success() {
408        let mut span = create_test_json_span(1, 2, 0, 0, false);
409        span["metrics"] = json!(null);
410        let encoded_data = rmp_serde::to_vec_named(&vec![vec![span]]).unwrap();
411        let (decoded_traces, _) =
412            from_bytes(libdd_tinybytes::Bytes::from(encoded_data)).expect("Decoding failed");
413
414        assert_eq!(1, decoded_traces.len());
415        assert_eq!(1, decoded_traces[0].len());
416        let decoded_span = &decoded_traces[0][0];
417        assert!(decoded_span.metrics.is_empty());
418    }
419
420    #[test]
421    fn test_decoder_span_link_success() {
422        let expected_span_link = json!({
423            "trace_id": 1,
424            "trace_id_high": 0,
425            "span_id": 1,
426            "attributes": {
427                "attr1": "test_value",
428                "attr2": "test_value2"
429            },
430            "tracestate": "state_test",
431            "flags": 0b101
432        });
433
434        let mut span = create_test_json_span(1, 2, 0, 0, false);
435        span["span_links"] = json!([expected_span_link]);
436
437        let encoded_data = rmp_serde::to_vec_named(&vec![vec![span]]).unwrap();
438        let (decoded_traces, _) =
439            from_bytes(libdd_tinybytes::Bytes::from(encoded_data)).expect("Decoding failed");
440
441        assert_eq!(1, decoded_traces.len());
442        assert_eq!(1, decoded_traces[0].len());
443        let decoded_span = &decoded_traces[0][0];
444
445        assert_eq!(
446            expected_span_link["trace_id"],
447            decoded_span.span_links[0].trace_id
448        );
449        assert_eq!(
450            expected_span_link["trace_id_high"],
451            decoded_span.span_links[0].trace_id_high
452        );
453        assert_eq!(
454            expected_span_link["span_id"],
455            decoded_span.span_links[0].span_id
456        );
457        assert_eq!(
458            expected_span_link["tracestate"],
459            decoded_span.span_links[0].tracestate.as_str()
460        );
461        assert_eq!(
462            expected_span_link["flags"],
463            decoded_span.span_links[0].flags
464        );
465        assert_eq!(
466            expected_span_link["attributes"]["attr1"],
467            decoded_span.span_links[0].attributes
468                [&BytesString::from_slice("attr1".as_ref()).unwrap()]
469                .as_str()
470        );
471        assert_eq!(
472            expected_span_link["attributes"]["attr2"],
473            decoded_span.span_links[0].attributes
474                [&BytesString::from_slice("attr2".as_ref()).unwrap()]
475                .as_str()
476        );
477    }
478
479    #[test]
480    fn test_decoder_null_span_link_success() {
481        let mut span = create_test_json_span(1, 2, 0, 0, false);
482        span["span_links"] = json!(null);
483
484        let encoded_data = rmp_serde::to_vec_named(&vec![vec![span]]).unwrap();
485        let (decoded_traces, _) =
486            from_bytes(libdd_tinybytes::Bytes::from(encoded_data)).expect("Decoding failed");
487
488        assert_eq!(1, decoded_traces.len());
489        assert_eq!(1, decoded_traces[0].len());
490        let decoded_span = &decoded_traces[0][0];
491
492        assert!(decoded_span.span_links.is_empty());
493    }
494
495    #[test]
496    fn test_decoder_read_string_wrong_format() {
497        let span = SpanBytes {
498            service: BytesString::from_slice("my_service".as_ref()).unwrap(),
499            ..Default::default()
500        };
501        let mut encoded_data = rmp_serde::to_vec_named(&vec![vec![span]]).unwrap();
502        // This changes the map size from 11 to 12 to trigger an InvalidMarkerRead error.
503        encoded_data[2] = 0x8c;
504        let slice = encoded_data.as_ref();
505
506        let result = from_slice(slice);
507        assert_eq!(
508            Err(DecodeError::InvalidFormat(
509                "Expected at least bytes 1, but only got 0 (pos 0)".to_owned()
510            )),
511            result
512        );
513    }
514
515    #[test]
516    fn test_decoder_read_string_utf8_error() {
517        let invalid_seq = vec![0, 159, 146, 150];
518        let invalid_str = unsafe { String::from_utf8_unchecked(invalid_seq) };
519        let invalid_str_as_bytes = libdd_tinybytes::Bytes::from(invalid_str);
520        let span = SpanBytes {
521            name: unsafe { BytesString::from_bytes_unchecked(invalid_str_as_bytes) },
522            ..Default::default()
523        };
524        let encoded_data = rmp_serde::to_vec_named(&vec![vec![span]]).unwrap();
525        let slice = encoded_data.as_ref();
526
527        let result = from_slice(slice);
528        assert_eq!(
529            Err(DecodeError::Utf8Error(
530                "invalid utf-8 sequence of 1 bytes from index 1".to_owned()
531            )),
532            result
533        );
534    }
535
536    #[test]
537    fn test_decoder_invalid_marker_for_trace_count_read() {
538        let span = SpanBytes::default();
539        let mut encoded_data = rmp_serde::to_vec_named(&vec![vec![span]]).unwrap();
540        // This changes the entire payload to a map with 12 keys in order to trigger an error when
541        // reading the array len of traces
542        encoded_data[0] = 0x8c;
543        let slice = encoded_data.as_ref();
544
545        let result = from_slice(slice);
546        assert_eq!(
547            Err(DecodeError::InvalidFormat(
548                "Unable to read array len for trace count".to_string()
549            )),
550            result
551        );
552    }
553
554    #[test]
555    fn test_decoder_invalid_marker_for_span_count_read() {
556        let span = SpanBytes::default();
557        let mut encoded_data = rmp_serde::to_vec_named(&vec![vec![span]]).unwrap();
558        // This changes the entire payload to a map with 12 keys in order to trigger an error when
559        // reading the array len of spans
560        encoded_data[1] = 0x8c;
561        let slice = encoded_data.as_ref();
562
563        let result = from_slice(slice);
564        assert_eq!(
565            Err(DecodeError::InvalidFormat(
566                "Unable to read array len for span count".to_owned()
567            )),
568            result
569        );
570    }
571
572    #[test]
573    fn test_decoder_read_string_type_mismatch() {
574        let span = SpanBytes::default();
575        let mut encoded_data = rmp_serde::to_vec_named(&vec![vec![span]]).unwrap();
576        // Modify the encoded data to cause a type mismatch by changing the marker for the `name`
577        // field to an integer marker
578        encoded_data[3] = 0x01;
579        let slice = encoded_data.as_ref();
580
581        let result = from_slice(slice);
582        assert_eq!(
583            Err(DecodeError::InvalidType(
584                "Type mismatch at marker FixPos(1)".to_owned()
585            )),
586            result
587        );
588    }
589
590    #[test]
591    #[cfg_attr(miri, ignore)]
592    fn fuzz_from_bytes() {
593        check!()
594            .with_type::<(
595                String,
596                String,
597                String,
598                String,
599                String,
600                String,
601                String,
602                String,
603                u64,
604                u64,
605                u64,
606                i64,
607            )>()
608            .cloned()
609            .for_each(
610                |(
611                    name,
612                    service,
613                    resource,
614                    span_type,
615                    meta_key,
616                    meta_value,
617                    metric_key,
618                    metric_value,
619                    trace_id,
620                    span_id,
621                    parent_id,
622                    start,
623                )| {
624                    let span = SpanBytes {
625                        name: BytesString::from_slice(name.as_ref()).unwrap(),
626                        service: BytesString::from_slice(service.as_ref()).unwrap(),
627                        resource: BytesString::from_slice(resource.as_ref()).unwrap(),
628                        r#type: BytesString::from_slice(span_type.as_ref()).unwrap(),
629                        meta: HashMap::from([(
630                            BytesString::from_slice(meta_key.as_ref()).unwrap(),
631                            BytesString::from_slice(meta_value.as_ref()).unwrap(),
632                        )]),
633                        metrics: HashMap::from([(
634                            BytesString::from_slice(metric_key.as_ref()).unwrap(),
635                            metric_value.parse::<f64>().unwrap_or_default(),
636                        )]),
637                        trace_id: trace_id as u128,
638                        span_id,
639                        parent_id,
640                        start,
641                        ..Default::default()
642                    };
643                    let encoded_data = to_vec_named(&vec![vec![span]]).unwrap();
644                    let result = from_bytes(libdd_tinybytes::Bytes::from(encoded_data));
645
646                    assert!(result.is_ok());
647                },
648            );
649    }
650}