1use crate::span::v05::dict::SharedDict;
5use crate::span::{v04, v05, BytesData, SharedDictBytes, TraceData};
6use crate::trace_utils::collect_trace_chunks;
7use crate::{msgpack_decoder, trace_utils::cmp_send_data_payloads};
8use libdd_trace_protobuf::pb;
9use std::cmp::Ordering;
10use std::iter::Iterator;
11
12pub type TracerPayloadV04 = Vec<v04::SpanBytes>;
13pub type TracerPayloadV05 = Vec<v05::Span>;
14
15#[derive(Debug, Clone)]
16pub enum TraceEncoding {
18 V04,
20 V05,
22}
23
24#[derive(Debug)]
25pub enum TraceChunks<T: TraceData> {
26 V04(Vec<Vec<v04::Span<T>>>),
28 V05((SharedDict<T::Text>, Vec<Vec<v05::Span>>)),
30 V1(Vec<Vec<v04::Span<T>>>),
32}
33
34impl TraceChunks<BytesData> {
35 pub fn into_tracer_payload_collection(self) -> TracerPayloadCollection {
36 match self {
37 TraceChunks::V04(traces) => TracerPayloadCollection::V04(traces),
38 TraceChunks::V05(traces) => TracerPayloadCollection::V05(traces),
39 TraceChunks::V1(traces) => TracerPayloadCollection::V04(traces),
41 }
42 }
43}
44
45impl<T: TraceData> TraceChunks<T> {
46 pub fn size(&self) -> usize {
48 match self {
49 TraceChunks::V04(traces) => traces.len(),
50 TraceChunks::V05((_, traces)) => traces.len(),
51 TraceChunks::V1(traces) => traces.len(),
52 }
53 }
54}
55
56#[derive(Debug)]
57pub enum TracerPayloadCollection {
59 V07(Vec<pb::TracerPayload>),
61 V04(Vec<Vec<v04::SpanBytes>>),
63 V05((SharedDictBytes, Vec<Vec<v05::Span>>)),
65}
66
67impl TracerPayloadCollection {
68 pub fn append(&mut self, other: &mut Self) {
84 match self {
85 TracerPayloadCollection::V07(dest) => {
86 if let TracerPayloadCollection::V07(src) = other {
87 dest.append(src)
88 }
89 }
90 TracerPayloadCollection::V04(dest) => {
91 if let TracerPayloadCollection::V04(src) = other {
92 dest.append(src)
93 }
94 }
95 #[allow(clippy::unimplemented)]
97 TracerPayloadCollection::V05(_) => unimplemented!("Append for V05 not implemented"),
98 }
99 }
100
101 pub fn merge(&mut self) {
113 if let TracerPayloadCollection::V07(collection) = self {
114 collection.sort_unstable_by(cmp_send_data_payloads);
115 collection.dedup_by(|a, b| {
116 if cmp_send_data_payloads(a, b) == Ordering::Equal {
117 b.chunks.append(&mut a.chunks);
119 return true;
120 }
121 false
122 })
123 }
124 }
125
126 pub fn size(&self) -> usize {
141 match self {
142 TracerPayloadCollection::V07(collection) => {
143 collection.iter().map(|s| s.chunks.len()).sum()
144 }
145 TracerPayloadCollection::V04(collection) => collection.len(),
146 TracerPayloadCollection::V05((_, collection)) => collection.len(),
147 }
148 }
149}
150
151pub trait TraceChunkProcessor {
181 fn process(&mut self, chunk: &mut pb::TraceChunk, index: usize);
182}
183
184#[derive(Default)]
185pub struct DefaultTraceChunkProcessor;
189
190impl TraceChunkProcessor for DefaultTraceChunkProcessor {
191 fn process(&mut self, _chunk: &mut pb::TraceChunk, _index: usize) {
192 }
194}
195
196pub fn decode_to_trace_chunks(
228 data: libdd_tinybytes::Bytes,
229 encoding_type: TraceEncoding,
230) -> Result<(TraceChunks<BytesData>, usize), anyhow::Error> {
231 let (data, size) = match encoding_type {
232 TraceEncoding::V04 => msgpack_decoder::v04::from_bytes(data),
233 TraceEncoding::V05 => msgpack_decoder::v05::from_bytes(data),
234 }
235 .map_err(|e| anyhow::format_err!("Error deserializing trace from request body: {e}"))?;
236
237 Ok((
238 collect_trace_chunks(data, matches!(encoding_type, TraceEncoding::V05))?,
239 size,
240 ))
241}
242
243#[cfg(test)]
244mod tests {
245 use super::*;
246 use crate::span::v04::SpanBytes;
247 use crate::test_utils::create_test_no_alloc_span;
248 use libdd_tinybytes::BytesString;
249 use libdd_trace_protobuf::pb;
250 use serde_json::json;
251 use std::collections::HashMap;
252
253 fn create_dummy_collection_v07() -> TracerPayloadCollection {
254 TracerPayloadCollection::V07(vec![pb::TracerPayload {
255 container_id: "".to_string(),
256 language_name: "".to_string(),
257 language_version: "".to_string(),
258 tracer_version: "".to_string(),
259 runtime_id: "".to_string(),
260 chunks: vec![pb::TraceChunk {
261 priority: 0,
262 origin: "".to_string(),
263 spans: vec![],
264 tags: Default::default(),
265 dropped_trace: false,
266 }],
267 tags: Default::default(),
268 env: "".to_string(),
269 hostname: "".to_string(),
270 app_version: "".to_string(),
271 }])
272 }
273
274 fn create_trace() -> Vec<SpanBytes> {
275 vec![
276 create_test_no_alloc_span(1234, 12341, 0, 1, true),
278 create_test_no_alloc_span(1234, 12342, 12341, 1, false),
279 create_test_no_alloc_span(1234, 12343, 12342, 1, false),
280 ]
281 }
282
283 #[test]
284 fn test_append_traces_v07() {
285 let mut two_traces = create_dummy_collection_v07();
286 two_traces.append(&mut create_dummy_collection_v07());
287
288 let mut trace = create_dummy_collection_v07();
289
290 let mut empty = TracerPayloadCollection::V07(vec![]);
291
292 trace.append(&mut create_dummy_collection_v07());
293 assert_eq!(2, trace.size());
294
295 trace.append(&mut two_traces);
296 assert_eq!(4, trace.size());
297
298 trace.append(&mut empty);
299 assert_eq!(4, trace.size());
300 }
301
302 #[test]
303 fn test_append_traces_v04() {
304 fn create_trace() -> TracerPayloadCollection {
305 TracerPayloadCollection::V04(vec![vec![create_test_no_alloc_span(0, 1, 0, 2, true)]])
306 }
307
308 let mut two_traces = create_trace();
309 two_traces.append(&mut create_trace());
310
311 let mut trace = create_trace();
312
313 let mut empty = TracerPayloadCollection::V04(vec![]);
314
315 trace.append(&mut create_trace());
316 assert_eq!(2, trace.size());
317
318 trace.append(&mut two_traces);
319 assert_eq!(4, trace.size());
320
321 trace.append(&mut empty);
322 assert_eq!(4, trace.size());
323 }
324
325 #[test]
326 fn test_merge_traces() {
327 let mut trace = create_dummy_collection_v07();
328
329 trace.append(&mut create_dummy_collection_v07());
330 assert_eq!(2, trace.size());
331
332 trace.merge();
333 assert_eq!(2, trace.size());
334 if let TracerPayloadCollection::V07(collection) = trace {
335 assert_eq!(1, collection.len());
336 } else {
337 panic!("Unexpected type");
338 }
339 }
340
341 #[test]
342 fn test_try_into_success() {
343 let span_data1 = json!([{
344 "service": "test-service",
345 "name": "test-service-name",
346 "resource": "test-service-resource",
347 "trace_id": 111,
348 "span_id": 222,
349 "parent_id": 100,
350 "start": 1,
351 "duration": 5,
352 "error": 0,
353 "meta": {},
354 "metrics": {},
355 "type": "serverless",
356 }]);
357
358 let expected_serialized_span_data1 = vec![SpanBytes {
359 service: BytesString::from_slice("test-service".as_ref()).unwrap(),
360 name: BytesString::from_slice("test-service-name".as_ref()).unwrap(),
361 resource: BytesString::from_slice("test-service-resource".as_ref()).unwrap(),
362 trace_id: 111,
363 span_id: 222,
364 parent_id: 100,
365 start: 1,
366 duration: 5,
367 error: 0,
368 meta: HashMap::new(),
369 metrics: HashMap::new(),
370 meta_struct: HashMap::new(),
371 r#type: BytesString::from_slice("serverless".as_ref()).unwrap(),
372 span_links: vec![],
373 span_events: vec![],
374 }];
375
376 let span_data2 = json!([{
377 "service": "test-service",
378 "name": "test-service-name",
379 "resource": "test-service-resource",
380 "trace_id": 111,
381 "span_id": 333,
382 "parent_id": 100,
383 "start": 1,
384 "duration": 5,
385 "error": 1,
386 "meta": {},
387 "metrics": {},
388 "type": "",
389 }]);
390
391 let expected_serialized_span_data2 = vec![SpanBytes {
392 service: BytesString::from_slice("test-service".as_ref()).unwrap(),
393 name: BytesString::from_slice("test-service-name".as_ref()).unwrap(),
394 resource: BytesString::from_slice("test-service-resource".as_ref()).unwrap(),
395 trace_id: 111,
396 span_id: 333,
397 parent_id: 100,
398 start: 1,
399 duration: 5,
400 error: 1,
401 meta: HashMap::new(),
402 metrics: HashMap::new(),
403 meta_struct: HashMap::new(),
404 r#type: BytesString::default(),
405 span_links: vec![],
406 span_events: vec![],
407 }];
408
409 let data = rmp_serde::to_vec(&vec![span_data1, span_data2])
410 .expect("Failed to serialize test span.");
411 let data = libdd_tinybytes::Bytes::from(data);
412
413 let result = decode_to_trace_chunks(data, TraceEncoding::V04);
414
415 assert!(result.is_ok());
416
417 let (chunks, _) = result.unwrap();
418 assert_eq!(2, chunks.size());
419
420 if let TraceChunks::V04(traces) = chunks {
421 assert_eq!(expected_serialized_span_data1, traces[0]);
422 assert_eq!(expected_serialized_span_data2, traces[1]);
423 } else {
424 panic!("Invalid collection type returned for try_into");
425 }
426 }
427
428 #[cfg_attr(miri, ignore)]
429 #[test]
430 fn test_try_into_empty() {
431 let empty_data = vec![0x90];
432 let data = libdd_tinybytes::Bytes::from(empty_data);
433
434 let result = decode_to_trace_chunks(data, TraceEncoding::V04);
435
436 assert!(result.is_ok());
437
438 let (collection, _) = result.unwrap();
439 assert_eq!(0, collection.size());
440 }
441
442 #[test]
443 fn test_try_into_meta_metrics_success() {
444 let dummy_trace = create_trace();
445 let expected = vec![create_trace()];
446 let payload = rmp_serde::to_vec_named(&expected).unwrap();
447 let payload = libdd_tinybytes::Bytes::from(payload);
448
449 let result = decode_to_trace_chunks(payload, TraceEncoding::V04);
450
451 assert!(result.is_ok());
452
453 let (collection, _size) = result.unwrap();
454 assert_eq!(1, collection.size());
455 if let TraceChunks::V04(traces) = collection {
456 assert_eq!(dummy_trace, traces[0]);
457 } else {
458 panic!("Invalid collection type returned for try_into");
459 }
460 }
461}