1use crate::msgpack_decoder::decode::error::DecodeError;
5use crate::msgpack_decoder::decode::{
6 buffer::Buffer, map::read_map_len, number::read_number, string::handle_null_marker,
7};
8use crate::span::v04::{Span, SpanBytes, SpanSlice};
9use crate::span::DeserializableTraceData;
10use std::collections::HashMap;
11
12const PAYLOAD_LEN: u32 = 2;
13const SPAN_ELEM_COUNT: u32 = 12;
14
15pub fn from_bytes(
70 data: libdd_tinybytes::Bytes,
71) -> Result<(Vec<Vec<SpanBytes>>, usize), DecodeError> {
72 from_buffer(&mut Buffer::new(data))
73}
74
75pub fn from_slice(data: &[u8]) -> Result<(Vec<Vec<SpanSlice<'_>>>, usize), DecodeError> {
130 from_buffer(&mut Buffer::new(data))
131}
132
133#[allow(clippy::type_complexity)]
134fn from_buffer<T: DeserializableTraceData>(
135 data: &mut Buffer<T>,
136) -> Result<(Vec<Vec<Span<T>>>, usize), DecodeError>
137where
138 T::Text: Clone,
139{
140 let data_elem = rmp::decode::read_array_len(data.as_mut_slice())
141 .map_err(|_| DecodeError::InvalidFormat("Unable to read payload len".to_string()))?;
142
143 if data_elem != PAYLOAD_LEN {
144 return Err(DecodeError::InvalidFormat(
145 "Invalid payload size".to_string(),
146 ));
147 }
148
149 let dict = deserialize_dict(data)?;
150
151 let trace_count = rmp::decode::read_array_len(data.as_mut_slice())
152 .map_err(|_| DecodeError::InvalidFormat("Unable to read trace len".to_string()))?;
153
154 let mut traces: Vec<Vec<Span<T>>> = Vec::with_capacity(trace_count as usize);
155 let start_len = data.len();
156
157 for _ in 0..trace_count {
158 let span_count = rmp::decode::read_array_len(data.as_mut_slice())
159 .map_err(|_| DecodeError::InvalidFormat("Unable to read span len".to_string()))?;
160 let mut trace: Vec<Span<T>> = Vec::with_capacity(span_count as usize);
161
162 for _ in 0..span_count {
163 let span = deserialize_span(data, &dict)?;
164 trace.push(span);
165 }
166 traces.push(trace);
167 }
168 Ok((traces, start_len - data.len()))
169}
170
171fn deserialize_dict<T: DeserializableTraceData>(
172 data: &mut Buffer<T>,
173) -> Result<Vec<T::Text>, DecodeError> {
174 let dict_len = rmp::decode::read_array_len(data.as_mut_slice())
175 .map_err(|_| DecodeError::InvalidFormat("Unable to read dictionary len".to_string()))?;
176
177 let mut dict: Vec<T::Text> = Vec::with_capacity(dict_len as usize);
178 for _ in 0..dict_len {
179 let str = data.read_string()?;
180 dict.push(str);
181 }
182 Ok(dict)
183}
184
185fn deserialize_span<T: DeserializableTraceData>(
186 data: &mut Buffer<T>,
187 dict: &[T::Text],
188) -> Result<Span<T>, DecodeError>
189where
190 T::Text: Clone,
191{
192 let mut span = Span::default();
193 let span_len = rmp::decode::read_array_len(data.as_mut_slice())
194 .map_err(|_| DecodeError::InvalidFormat("Unable to read dictionary len".to_string()))?;
195
196 if span_len != SPAN_ELEM_COUNT {
197 return Err(DecodeError::InvalidFormat(
198 "Invalid number of span fields".to_string(),
199 ));
200 }
201
202 span.service = get_from_dict(data, dict)?;
203 span.name = get_from_dict(data, dict)?;
204 span.resource = get_from_dict(data, dict)?;
205 span.trace_id = read_number::<_, u64>(data)? as u128;
206 span.span_id = read_number(data)?;
207 span.parent_id = read_number(data)?;
208 span.start = read_number(data)?;
209 span.duration = read_number(data)?;
210 span.error = read_number(data)?;
211 span.meta = read_indexed_map_to_bytes_strings(data, dict)?;
212 span.metrics = read_metrics(data, dict)?;
213 span.r#type = get_from_dict(data, dict)?;
214
215 Ok(span)
216}
217
218fn get_from_dict<T: DeserializableTraceData>(
219 data: &mut Buffer<T>,
220 dict: &[T::Text],
221) -> Result<T::Text, DecodeError>
222where
223 T::Text: Clone,
224{
225 let index: u32 = read_number(data)?;
226 match dict.get(index as usize) {
227 Some(value) => Ok(value.clone()),
228 None => Err(DecodeError::InvalidFormat(
229 "Unable to locate string in the dictionary".to_string(),
230 )),
231 }
232}
233
234fn read_indexed_map_to_bytes_strings<T: DeserializableTraceData>(
235 buf: &mut Buffer<T>,
236 dict: &[T::Text],
237) -> Result<HashMap<T::Text, T::Text>, DecodeError>
238where
239 T::Text: Clone,
240{
241 let len = rmp::decode::read_map_len(buf.as_mut_slice())
242 .map_err(|_| DecodeError::InvalidFormat("Unable to get map len for str map".to_owned()))?;
243
244 #[allow(clippy::expect_used)]
245 let mut map = HashMap::with_capacity(len.try_into().expect("Unable to cast map len to usize"));
246 for _ in 0..len {
247 let key = get_from_dict(buf, dict)?;
248 let value = get_from_dict(buf, dict)?;
249 map.insert(key, value);
250 }
251 Ok(map)
252}
253
254fn read_metrics<T: DeserializableTraceData>(
255 buf: &mut Buffer<T>,
256 dict: &[T::Text],
257) -> Result<HashMap<T::Text, f64>, DecodeError>
258where
259 T::Text: Clone,
260{
261 if handle_null_marker(buf) {
262 return Ok(HashMap::default());
263 }
264
265 let len = read_map_len(buf)?;
266
267 let mut map = HashMap::with_capacity(len);
268 for _ in 0..len {
269 let k = get_from_dict(buf, dict)?;
270 let v = read_number(buf)?;
271 map.insert(k, v);
272 }
273 Ok(map)
274}
275
276#[cfg(test)]
277mod tests {
278 use super::*;
279 use crate::span::SliceData;
280 use std::collections::HashMap;
281
282 type V05Span = (
283 u8,
284 u8,
285 u8,
286 u64,
287 u64,
288 u64,
289 i64,
290 i64,
291 i32,
292 HashMap<u8, u8>,
293 HashMap<u8, f64>,
294 u8,
295 );
296
297 type V05SpanMalformed = (
298 u8,
299 u8,
300 u8,
301 u64,
302 u64,
303 u64,
304 i64,
305 i64,
306 i32,
307 HashMap<u8, u8>,
308 HashMap<u8, f64>,
309 );
310
311 type V05Payload = (Vec<String>, Vec<Vec<V05Span>>);
312 type V05PayloadMalformed = (Vec<String>, Vec<Vec<V05SpanMalformed>>);
313
314 #[test]
315 fn deserialize_dict_test() {
316 let dict = vec!["foo", "bar", "baz"];
317 let mpack = rmp_serde::to_vec(&dict).unwrap();
318 let mut payload = Buffer::<SliceData>::new(mpack.as_ref());
319
320 let result = deserialize_dict(&mut payload).unwrap();
321 assert_eq!(dict, result);
322 }
323
324 #[test]
325 fn from_bytes_invalid_size_test() {
326 let empty_three: [u8; 3] = [0x93, 0x90, 0x90];
328 let payload = unsafe { std::mem::transmute::<&'_ [u8], &'static [u8]>(&empty_three) };
329 let bytes = libdd_tinybytes::Bytes::from_static(payload);
330 let result = from_bytes(bytes);
331
332 assert!(result.is_err());
333 matches!(result.err().unwrap(), DecodeError::InvalidFormat(_));
334
335 let empty_one: [u8; 2] = [0x91, 0x90];
337 let payload = unsafe { std::mem::transmute::<&'_ [u8], &'static [u8]>(&empty_one) };
338 let bytes = libdd_tinybytes::Bytes::from_static(payload);
339 let result = from_bytes(bytes);
340
341 assert!(result.is_err());
342 matches!(result.err().unwrap(), DecodeError::InvalidFormat(_));
343 }
344
345 #[test]
346 fn from_bytes_test() {
347 let data: V05Payload = (
348 vec![
349 "".to_string(),
350 "item".to_string(),
351 "version".to_string(),
352 "7.0".to_string(),
353 "my-name".to_string(),
354 "X".to_string(),
355 "my-service".to_string(),
356 "my-resource".to_string(),
357 "_dd.sampling_rate_whatever".to_string(),
358 "value whatever".to_string(),
359 "sql".to_string(),
360 ],
361 vec![vec![(
362 6,
363 4,
364 7,
365 1,
366 2,
367 3,
368 123,
369 456,
370 1,
371 HashMap::from([(8, 9), (0, 1), (2, 3)]),
372 HashMap::from([(5, 1.2)]),
373 10,
374 )]],
375 );
376 let msgpack = rmp_serde::to_vec(&data).unwrap();
377 let (traces, _) = from_bytes(libdd_tinybytes::Bytes::from(msgpack)).unwrap();
378
379 let span = &traces[0][0];
380 assert_eq!(span.service.as_str(), "my-service");
381 assert_eq!(span.name.as_str(), "my-name");
382 assert_eq!(span.resource.as_str(), "my-resource");
383 assert_eq!(span.trace_id, 1);
384 assert_eq!(span.span_id, 2);
385 assert_eq!(span.parent_id, 3);
386 assert_eq!(span.start, 123);
387 assert_eq!(span.duration, 456);
388 assert_eq!(span.error, 1);
389 assert_eq!(span.meta.len(), 3);
390 assert_eq!(
391 span.meta
392 .get("_dd.sampling_rate_whatever")
393 .unwrap()
394 .as_str(),
395 "value whatever"
396 );
397 assert_eq!(span.meta.get("").unwrap().as_str(), "item");
398 assert_eq!(span.meta.get("version").unwrap().as_str(), "7.0");
399 assert_eq!(span.metrics.len(), 1);
400 assert_eq!(*span.metrics.get("X").unwrap(), 1.2_f64);
401 assert_eq!(span.r#type.as_str(), "sql");
402 }
403
404 #[test]
405 fn missing_dict_elements_test() {
406 let data: V05Payload = (
407 vec![
408 "".to_string(),
409 "item".to_string(),
410 "version".to_string(),
411 "7.0".to_string(),
412 "my-name".to_string(),
413 "X".to_string(),
414 "my-service".to_string(),
415 "my-resource".to_string(),
416 "_dd.sampling_rate_whatever".to_string(),
417 "value whatever".to_string(),
418 ],
419 vec![vec![(
420 6,
421 4,
422 7,
423 1,
424 2,
425 3,
426 123,
427 456,
428 1,
429 HashMap::from([(8, 9), (0, 1), (2, 3)]),
430 HashMap::from([(5, 1.2)]),
431 10,
432 )]],
433 );
434 let payload = rmp_serde::to_vec(&data).unwrap();
435 let payload = unsafe { std::mem::transmute::<&'_ [u8], &'static [u8]>(&payload) };
436 let result = from_bytes(libdd_tinybytes::Bytes::from_static(payload));
437
438 assert!(result.is_err());
439
440 matches!(result.err().unwrap(), DecodeError::InvalidFormat(_));
442 }
443
444 #[test]
445 fn missing_span_elements_test() {
446 let data: V05PayloadMalformed = (
447 vec![
448 "".to_string(),
449 "item".to_string(),
450 "version".to_string(),
451 "7.0".to_string(),
452 "my-name".to_string(),
453 "X".to_string(),
454 "my-service".to_string(),
455 "my-resource".to_string(),
456 "_dd.sampling_rate_whatever".to_string(),
457 "value whatever".to_string(),
458 ],
459 vec![vec![(
460 6,
461 4,
462 7,
463 1,
464 2,
465 3,
466 123,
467 456,
468 1,
469 HashMap::from([(8, 9), (0, 1), (2, 3)]),
470 HashMap::from([(5, 1.2)]),
471 )]],
472 );
473
474 let payload = rmp_serde::to_vec(&data).unwrap();
475 let payload = unsafe { std::mem::transmute::<&'_ [u8], &'static [u8]>(&payload) };
476 let result = from_bytes(libdd_tinybytes::Bytes::from_static(payload));
477
478 assert!(result.is_err());
479
480 matches!(result.err().unwrap(), DecodeError::InvalidFormat(_));
482 }
483}