1use crate::span::v05::dict::SharedDict;
5use crate::span::{v05, SharedDictBytes, Span, SpanBytes, SpanText};
6use crate::trace_utils::collect_trace_chunks;
7use crate::{msgpack_decoder, trace_utils::cmp_send_data_payloads};
8use libdd_tinybytes::BytesString;
9use libdd_trace_protobuf::pb;
10use std::cmp::Ordering;
11use std::iter::Iterator;
12
13pub type TracerPayloadV04 = Vec<SpanBytes>;
14pub type TracerPayloadV05 = Vec<v05::Span>;
15
16#[derive(Debug, Clone)]
17pub enum TraceEncoding {
19 V04,
21 V05,
23}
24
25#[derive(Debug, Clone)]
26pub enum TraceChunks<T: SpanText> {
27 V04(Vec<Vec<Span<T>>>),
29 V05((SharedDict<T>, Vec<Vec<v05::Span>>)),
31}
32
33impl TraceChunks<BytesString> {
34 pub fn into_tracer_payload_collection(self) -> TracerPayloadCollection {
35 match self {
36 TraceChunks::V04(traces) => TracerPayloadCollection::V04(traces),
37 TraceChunks::V05(traces) => TracerPayloadCollection::V05(traces),
38 }
39 }
40}
41
42impl<T: SpanText> TraceChunks<T> {
43 pub fn size(&self) -> usize {
45 match self {
46 TraceChunks::V04(traces) => traces.len(),
47 TraceChunks::V05((_, traces)) => traces.len(),
48 }
49 }
50}
51
52#[derive(Debug, Clone)]
53pub enum TracerPayloadCollection {
55 V07(Vec<pb::TracerPayload>),
57 V04(Vec<Vec<SpanBytes>>),
59 V05((SharedDictBytes, Vec<Vec<v05::Span>>)),
61}
62
63impl TracerPayloadCollection {
64 pub fn append(&mut self, other: &mut Self) {
80 match self {
81 TracerPayloadCollection::V07(dest) => {
82 if let TracerPayloadCollection::V07(src) = other {
83 dest.append(src)
84 }
85 }
86 TracerPayloadCollection::V04(dest) => {
87 if let TracerPayloadCollection::V04(src) = other {
88 dest.append(src)
89 }
90 }
91 #[allow(clippy::unimplemented)]
93 TracerPayloadCollection::V05(_) => unimplemented!("Append for V05 not implemented"),
94 }
95 }
96
97 pub fn merge(&mut self) {
109 if let TracerPayloadCollection::V07(collection) = self {
110 collection.sort_unstable_by(cmp_send_data_payloads);
111 collection.dedup_by(|a, b| {
112 if cmp_send_data_payloads(a, b) == Ordering::Equal {
113 b.chunks.append(&mut a.chunks);
115 return true;
116 }
117 false
118 })
119 }
120 }
121
122 pub fn size(&self) -> usize {
137 match self {
138 TracerPayloadCollection::V07(collection) => {
139 collection.iter().map(|s| s.chunks.len()).sum()
140 }
141 TracerPayloadCollection::V04(collection) => collection.len(),
142 TracerPayloadCollection::V05((_, collection)) => collection.len(),
143 }
144 }
145}
146
147pub trait TraceChunkProcessor {
177 fn process(&mut self, chunk: &mut pb::TraceChunk, index: usize);
178}
179
180#[derive(Default)]
181pub struct DefaultTraceChunkProcessor;
185
186impl TraceChunkProcessor for DefaultTraceChunkProcessor {
187 fn process(&mut self, _chunk: &mut pb::TraceChunk, _index: usize) {
188 }
190}
191
192pub fn decode_to_trace_chunks(
224 data: libdd_tinybytes::Bytes,
225 encoding_type: TraceEncoding,
226) -> Result<(TraceChunks<BytesString>, usize), anyhow::Error> {
227 let (data, size) = match encoding_type {
228 TraceEncoding::V04 => msgpack_decoder::v04::from_bytes(data),
229 TraceEncoding::V05 => msgpack_decoder::v05::from_bytes(data),
230 }
231 .map_err(|e| anyhow::format_err!("Error deserializing trace from request body: {e}"))?;
232
233 Ok((
234 collect_trace_chunks(data, matches!(encoding_type, TraceEncoding::V05))?,
235 size,
236 ))
237}
238
239#[cfg(test)]
240mod tests {
241 use super::*;
242 use crate::span::SpanBytes;
243 use crate::test_utils::create_test_no_alloc_span;
244 use libdd_tinybytes::BytesString;
245 use libdd_trace_protobuf::pb;
246 use serde_json::json;
247 use std::collections::HashMap;
248
249 fn create_dummy_collection_v07() -> TracerPayloadCollection {
250 TracerPayloadCollection::V07(vec![pb::TracerPayload {
251 container_id: "".to_string(),
252 language_name: "".to_string(),
253 language_version: "".to_string(),
254 tracer_version: "".to_string(),
255 runtime_id: "".to_string(),
256 chunks: vec![pb::TraceChunk {
257 priority: 0,
258 origin: "".to_string(),
259 spans: vec![],
260 tags: Default::default(),
261 dropped_trace: false,
262 }],
263 tags: Default::default(),
264 env: "".to_string(),
265 hostname: "".to_string(),
266 app_version: "".to_string(),
267 }])
268 }
269
270 fn create_trace() -> Vec<SpanBytes> {
271 vec![
272 create_test_no_alloc_span(1234, 12341, 0, 1, true),
274 create_test_no_alloc_span(1234, 12342, 12341, 1, false),
275 create_test_no_alloc_span(1234, 12343, 12342, 1, false),
276 ]
277 }
278
279 #[test]
280 fn test_append_traces_v07() {
281 let mut trace = create_dummy_collection_v07();
282
283 let empty = TracerPayloadCollection::V07(vec![]);
284
285 trace.append(&mut trace.clone());
286 assert_eq!(2, trace.size());
287
288 trace.append(&mut trace.clone());
289 assert_eq!(4, trace.size());
290
291 trace.append(&mut empty.clone());
292 assert_eq!(4, trace.size());
293 }
294
295 #[test]
296 fn test_append_traces_v04() {
297 let mut trace =
298 TracerPayloadCollection::V04(vec![vec![create_test_no_alloc_span(0, 1, 0, 2, true)]]);
299
300 let empty = TracerPayloadCollection::V04(vec![]);
301
302 trace.append(&mut trace.clone());
303 assert_eq!(2, trace.size());
304
305 trace.append(&mut trace.clone());
306 assert_eq!(4, trace.size());
307
308 trace.append(&mut empty.clone());
309 assert_eq!(4, trace.size());
310 }
311
312 #[test]
313 fn test_merge_traces() {
314 let mut trace = create_dummy_collection_v07();
315
316 trace.append(&mut trace.clone());
317 assert_eq!(2, trace.size());
318
319 trace.merge();
320 assert_eq!(2, trace.size());
321 if let TracerPayloadCollection::V07(collection) = trace {
322 assert_eq!(1, collection.len());
323 } else {
324 panic!("Unexpected type");
325 }
326 }
327
328 #[test]
329 fn test_try_into_success() {
330 let span_data1 = json!([{
331 "service": "test-service",
332 "name": "test-service-name",
333 "resource": "test-service-resource",
334 "trace_id": 111,
335 "span_id": 222,
336 "parent_id": 100,
337 "start": 1,
338 "duration": 5,
339 "error": 0,
340 "meta": {},
341 "metrics": {},
342 "type": "serverless",
343 }]);
344
345 let expected_serialized_span_data1 = vec![SpanBytes {
346 service: BytesString::from_slice("test-service".as_ref()).unwrap(),
347 name: BytesString::from_slice("test-service-name".as_ref()).unwrap(),
348 resource: BytesString::from_slice("test-service-resource".as_ref()).unwrap(),
349 trace_id: 111,
350 span_id: 222,
351 parent_id: 100,
352 start: 1,
353 duration: 5,
354 error: 0,
355 meta: HashMap::new(),
356 metrics: HashMap::new(),
357 meta_struct: HashMap::new(),
358 r#type: BytesString::from_slice("serverless".as_ref()).unwrap(),
359 span_links: vec![],
360 span_events: vec![],
361 }];
362
363 let span_data2 = json!([{
364 "service": "test-service",
365 "name": "test-service-name",
366 "resource": "test-service-resource",
367 "trace_id": 111,
368 "span_id": 333,
369 "parent_id": 100,
370 "start": 1,
371 "duration": 5,
372 "error": 1,
373 "meta": {},
374 "metrics": {},
375 "type": "",
376 }]);
377
378 let expected_serialized_span_data2 = vec![SpanBytes {
379 service: BytesString::from_slice("test-service".as_ref()).unwrap(),
380 name: BytesString::from_slice("test-service-name".as_ref()).unwrap(),
381 resource: BytesString::from_slice("test-service-resource".as_ref()).unwrap(),
382 trace_id: 111,
383 span_id: 333,
384 parent_id: 100,
385 start: 1,
386 duration: 5,
387 error: 1,
388 meta: HashMap::new(),
389 metrics: HashMap::new(),
390 meta_struct: HashMap::new(),
391 r#type: BytesString::default(),
392 span_links: vec![],
393 span_events: vec![],
394 }];
395
396 let data = rmp_serde::to_vec(&vec![span_data1, span_data2])
397 .expect("Failed to serialize test span.");
398 let data = libdd_tinybytes::Bytes::from(data);
399
400 let result = decode_to_trace_chunks(data, TraceEncoding::V04);
401
402 assert!(result.is_ok());
403
404 let (chunks, _) = result.unwrap();
405 assert_eq!(2, chunks.size());
406
407 if let TraceChunks::V04(traces) = chunks {
408 assert_eq!(expected_serialized_span_data1, traces[0]);
409 assert_eq!(expected_serialized_span_data2, traces[1]);
410 } else {
411 panic!("Invalid collection type returned for try_into");
412 }
413 }
414
415 #[cfg_attr(miri, ignore)]
416 #[test]
417 fn test_try_into_empty() {
418 let empty_data = vec![0x90];
419 let data = libdd_tinybytes::Bytes::from(empty_data);
420
421 let result = decode_to_trace_chunks(data, TraceEncoding::V04);
422
423 assert!(result.is_ok());
424
425 let (collection, _) = result.unwrap();
426 assert_eq!(0, collection.size());
427 }
428
429 #[test]
430 fn test_try_into_meta_metrics_success() {
431 let dummy_trace = create_trace();
432 let expected = vec![dummy_trace.clone()];
433 let payload = rmp_serde::to_vec_named(&expected).unwrap();
434 let payload = libdd_tinybytes::Bytes::from(payload);
435
436 let result = decode_to_trace_chunks(payload, TraceEncoding::V04);
437
438 assert!(result.is_ok());
439
440 let (collection, _size) = result.unwrap();
441 assert_eq!(1, collection.size());
442 if let TraceChunks::V04(traces) = collection {
443 assert_eq!(dummy_trace, traces[0]);
444 } else {
445 panic!("Invalid collection type returned for try_into");
446 }
447 }
448}