1use crate::msgpack_decoder::decode::error::DecodeError;
5use crate::msgpack_decoder::decode::{
6 map::read_map_len,
7 number::read_number_slice,
8 string::{handle_null_marker, read_string_ref},
9};
10use crate::span::{SpanBytes, SpanSlice};
11use std::collections::HashMap;
12
13const PAYLOAD_LEN: u32 = 2;
14const SPAN_ELEM_COUNT: u32 = 12;
15
16pub fn from_bytes(
71 data: libdd_tinybytes::Bytes,
72) -> Result<(Vec<Vec<SpanBytes>>, usize), DecodeError> {
73 let (traces_ref, size) = from_slice(data.as_ref())?;
74
75 #[allow(clippy::unwrap_used)]
76 let traces_owned = traces_ref
77 .iter()
78 .map(|trace| {
79 trace
80 .iter()
81 .map(|span| span.try_to_bytes(&data).unwrap())
83 .collect()
84 })
85 .collect();
86 Ok((traces_owned, size))
87}
88
89pub fn from_slice(mut data: &[u8]) -> Result<(Vec<Vec<SpanSlice<'_>>>, usize), DecodeError> {
144 let data_elem = rmp::decode::read_array_len(&mut data)
145 .map_err(|_| DecodeError::InvalidFormat("Unable to read payload len".to_string()))?;
146
147 if data_elem != PAYLOAD_LEN {
148 return Err(DecodeError::InvalidFormat(
149 "Invalid payload size".to_string(),
150 ));
151 }
152
153 let dict = deserialize_dict(&mut data)?;
154
155 let trace_count = rmp::decode::read_array_len(&mut data)
156 .map_err(|_| DecodeError::InvalidFormat("Unable to read trace len".to_string()))?;
157
158 let mut traces: Vec<Vec<SpanSlice>> = Vec::with_capacity(trace_count as usize);
159 let start_len = data.len();
160
161 for _ in 0..trace_count {
162 let span_count = rmp::decode::read_array_len(&mut data)
163 .map_err(|_| DecodeError::InvalidFormat("Unable to read span len".to_string()))?;
164 let mut trace: Vec<SpanSlice> = Vec::with_capacity(span_count as usize);
165
166 for _ in 0..span_count {
167 let span = deserialize_span(&mut data, &dict)?;
168 trace.push(span);
169 }
170 traces.push(trace);
171 }
172 Ok((traces, start_len - data.len()))
173}
174
175fn deserialize_dict<'a>(data: &mut &'a [u8]) -> Result<Vec<&'a str>, DecodeError> {
176 let dict_len = rmp::decode::read_array_len(data)
177 .map_err(|_| DecodeError::InvalidFormat("Unable to read dictionary len".to_string()))?;
178
179 let mut dict: Vec<&'a str> = Vec::with_capacity(dict_len as usize);
180 for _ in 0..dict_len {
181 let str = read_string_ref(data)?;
182 dict.push(str);
183 }
184 Ok(dict)
185}
186
187fn deserialize_span<'a>(data: &mut &[u8], dict: &[&'a str]) -> Result<SpanSlice<'a>, DecodeError> {
188 let mut span = SpanSlice::default();
189 let span_len = rmp::decode::read_array_len(data)
190 .map_err(|_| DecodeError::InvalidFormat("Unable to read dictionary len".to_string()))?;
191
192 if span_len != SPAN_ELEM_COUNT {
193 return Err(DecodeError::InvalidFormat(
194 "Invalid number of span fields".to_string(),
195 ));
196 }
197
198 span.service = get_from_dict(data, dict)?;
199 span.name = get_from_dict(data, dict)?;
200 span.resource = get_from_dict(data, dict)?;
201 span.trace_id = read_number_slice::<u64>(data)? as u128;
202 span.span_id = read_number_slice(data)?;
203 span.parent_id = read_number_slice(data)?;
204 span.start = read_number_slice(data)?;
205 span.duration = read_number_slice(data)?;
206 span.error = read_number_slice(data)?;
207 span.meta = read_indexed_map_to_bytes_strings(data, dict)?;
208 span.metrics = read_metrics(data, dict)?;
209 span.r#type = get_from_dict(data, dict)?;
210
211 Ok(span)
212}
213
214fn get_from_dict<'a>(data: &mut &[u8], dict: &[&'a str]) -> Result<&'a str, DecodeError> {
215 let index: u32 = read_number_slice(data)?;
216 match dict.get(index as usize) {
217 Some(value) => Ok(value),
218 None => Err(DecodeError::InvalidFormat(
219 "Unable to locate string in the dictionary".to_string(),
220 )),
221 }
222}
223
224fn read_indexed_map_to_bytes_strings<'a>(
225 buf: &mut &[u8],
226 dict: &[&'a str],
227) -> Result<HashMap<&'a str, &'a str>, DecodeError> {
228 let len = rmp::decode::read_map_len(buf)
229 .map_err(|_| DecodeError::InvalidFormat("Unable to get map len for str map".to_owned()))?;
230
231 #[allow(clippy::expect_used)]
232 let mut map = HashMap::with_capacity(len.try_into().expect("Unable to cast map len to usize"));
233 for _ in 0..len {
234 let key = get_from_dict(buf, dict)?;
235 let value = get_from_dict(buf, dict)?;
236 map.insert(key, value);
237 }
238 Ok(map)
239}
240
241fn read_metrics<'a>(
242 buf: &mut &[u8],
243 dict: &[&'a str],
244) -> Result<HashMap<&'a str, f64>, DecodeError> {
245 if handle_null_marker(buf) {
246 return Ok(HashMap::default());
247 }
248
249 let len = read_map_len(buf)?;
250
251 let mut map = HashMap::with_capacity(len);
252 for _ in 0..len {
253 let k = get_from_dict(buf, dict)?;
254 let v = read_number_slice(buf)?;
255 map.insert(k, v);
256 }
257 Ok(map)
258}
259
260#[cfg(test)]
261mod tests {
262 use super::*;
263 use std::collections::HashMap;
264 type V05Span = (
265 u8,
266 u8,
267 u8,
268 u64,
269 u64,
270 u64,
271 i64,
272 i64,
273 i32,
274 HashMap<u8, u8>,
275 HashMap<u8, f64>,
276 u8,
277 );
278
279 type V05SpanMalformed = (
280 u8,
281 u8,
282 u8,
283 u64,
284 u64,
285 u64,
286 i64,
287 i64,
288 i32,
289 HashMap<u8, u8>,
290 HashMap<u8, f64>,
291 );
292
293 type V05Payload = (Vec<String>, Vec<Vec<V05Span>>);
294 type V05PayloadMalformed = (Vec<String>, Vec<Vec<V05SpanMalformed>>);
295
296 #[test]
297 fn deserialize_dict_test() {
298 let dict = vec!["foo", "bar", "baz"];
299 let mpack = rmp_serde::to_vec(&dict).unwrap();
300 let mut payload = mpack.as_ref();
301
302 let result = deserialize_dict(&mut payload).unwrap();
303 assert_eq!(dict, result);
304 }
305
306 #[test]
307 fn from_bytes_invalid_size_test() {
308 let empty_three: [u8; 3] = [0x93, 0x90, 0x90];
310 let payload = unsafe { std::mem::transmute::<&'_ [u8], &'static [u8]>(&empty_three) };
311 let bytes = libdd_tinybytes::Bytes::from_static(payload);
312 let result = from_bytes(bytes);
313
314 assert!(result.is_err());
315 matches!(result.err().unwrap(), DecodeError::InvalidFormat(_));
316
317 let empty_one: [u8; 2] = [0x91, 0x90];
319 let payload = unsafe { std::mem::transmute::<&'_ [u8], &'static [u8]>(&empty_one) };
320 let bytes = libdd_tinybytes::Bytes::from_static(payload);
321 let result = from_bytes(bytes);
322
323 assert!(result.is_err());
324 matches!(result.err().unwrap(), DecodeError::InvalidFormat(_));
325 }
326
327 #[test]
328 fn from_bytes_test() {
329 let data: V05Payload = (
330 vec![
331 "".to_string(),
332 "item".to_string(),
333 "version".to_string(),
334 "7.0".to_string(),
335 "my-name".to_string(),
336 "X".to_string(),
337 "my-service".to_string(),
338 "my-resource".to_string(),
339 "_dd.sampling_rate_whatever".to_string(),
340 "value whatever".to_string(),
341 "sql".to_string(),
342 ],
343 vec![vec![(
344 6,
345 4,
346 7,
347 1,
348 2,
349 3,
350 123,
351 456,
352 1,
353 HashMap::from([(8, 9), (0, 1), (2, 3)]),
354 HashMap::from([(5, 1.2)]),
355 10,
356 )]],
357 );
358 let msgpack = rmp_serde::to_vec(&data).unwrap();
359 let (traces, _) = from_bytes(libdd_tinybytes::Bytes::from(msgpack)).unwrap();
360
361 let span = &traces[0][0];
362 assert_eq!(span.service.as_str(), "my-service");
363 assert_eq!(span.name.as_str(), "my-name");
364 assert_eq!(span.resource.as_str(), "my-resource");
365 assert_eq!(span.trace_id, 1);
366 assert_eq!(span.span_id, 2);
367 assert_eq!(span.parent_id, 3);
368 assert_eq!(span.start, 123);
369 assert_eq!(span.duration, 456);
370 assert_eq!(span.error, 1);
371 assert_eq!(span.meta.len(), 3);
372 assert_eq!(
373 span.meta
374 .get("_dd.sampling_rate_whatever")
375 .unwrap()
376 .as_str(),
377 "value whatever"
378 );
379 assert_eq!(span.meta.get("").unwrap().as_str(), "item");
380 assert_eq!(span.meta.get("version").unwrap().as_str(), "7.0");
381 assert_eq!(span.metrics.len(), 1);
382 assert_eq!(*span.metrics.get("X").unwrap(), 1.2_f64);
383 assert_eq!(span.r#type.as_str(), "sql");
384 }
385
386 #[test]
387 fn missing_dict_elements_test() {
388 let data: V05Payload = (
389 vec![
390 "".to_string(),
391 "item".to_string(),
392 "version".to_string(),
393 "7.0".to_string(),
394 "my-name".to_string(),
395 "X".to_string(),
396 "my-service".to_string(),
397 "my-resource".to_string(),
398 "_dd.sampling_rate_whatever".to_string(),
399 "value whatever".to_string(),
400 ],
401 vec![vec![(
402 6,
403 4,
404 7,
405 1,
406 2,
407 3,
408 123,
409 456,
410 1,
411 HashMap::from([(8, 9), (0, 1), (2, 3)]),
412 HashMap::from([(5, 1.2)]),
413 10,
414 )]],
415 );
416 let payload = rmp_serde::to_vec(&data).unwrap();
417 let payload = unsafe { std::mem::transmute::<&'_ [u8], &'static [u8]>(&payload) };
418 let result = from_bytes(libdd_tinybytes::Bytes::from_static(payload));
419
420 assert!(result.is_err());
421
422 matches!(result.err().unwrap(), DecodeError::InvalidFormat(_));
424 }
425
426 #[test]
427 fn missing_span_elements_test() {
428 let data: V05PayloadMalformed = (
429 vec![
430 "".to_string(),
431 "item".to_string(),
432 "version".to_string(),
433 "7.0".to_string(),
434 "my-name".to_string(),
435 "X".to_string(),
436 "my-service".to_string(),
437 "my-resource".to_string(),
438 "_dd.sampling_rate_whatever".to_string(),
439 "value whatever".to_string(),
440 ],
441 vec![vec![(
442 6,
443 4,
444 7,
445 1,
446 2,
447 3,
448 123,
449 456,
450 1,
451 HashMap::from([(8, 9), (0, 1), (2, 3)]),
452 HashMap::from([(5, 1.2)]),
453 )]],
454 );
455
456 let payload = rmp_serde::to_vec(&data).unwrap();
457 let payload = unsafe { std::mem::transmute::<&'_ [u8], &'static [u8]>(&payload) };
458 let result = from_bytes(libdd_tinybytes::Bytes::from_static(payload));
459
460 assert!(result.is_err());
461
462 matches!(result.err().unwrap(), DecodeError::InvalidFormat(_));
464 }
465}