1use crate::span::v05::dict::SharedDict;
5use crate::span::{v04, v05, BytesData, SharedDictBytes, TraceData};
6use crate::trace_utils::collect_trace_chunks;
7use crate::{msgpack_decoder, trace_utils::cmp_send_data_payloads};
8use libdd_trace_protobuf::pb;
9use std::cmp::Ordering;
10use std::iter::Iterator;
11
12pub type TracerPayloadV04 = Vec<v04::SpanBytes>;
13pub type TracerPayloadV05 = Vec<v05::Span>;
14
15#[derive(Debug, Clone)]
16pub enum TraceEncoding {
18 V04,
20 V05,
22}
23
24#[derive(Debug)]
25pub enum TraceChunks<T: TraceData> {
26 V04(Vec<Vec<v04::Span<T>>>),
28 V05((SharedDict<T::Text>, Vec<Vec<v05::Span>>)),
30}
31
32impl TraceChunks<BytesData> {
33 pub fn into_tracer_payload_collection(self) -> TracerPayloadCollection {
34 match self {
35 TraceChunks::V04(traces) => TracerPayloadCollection::V04(traces),
36 TraceChunks::V05(traces) => TracerPayloadCollection::V05(traces),
37 }
38 }
39}
40
41impl<T: TraceData> TraceChunks<T> {
42 pub fn size(&self) -> usize {
44 match self {
45 TraceChunks::V04(traces) => traces.len(),
46 TraceChunks::V05((_, traces)) => traces.len(),
47 }
48 }
49}
50
51#[derive(Debug)]
52pub enum TracerPayloadCollection {
54 V07(Vec<pb::TracerPayload>),
56 V04(Vec<Vec<v04::SpanBytes>>),
58 V05((SharedDictBytes, Vec<Vec<v05::Span>>)),
60}
61
62impl TracerPayloadCollection {
63 pub fn append(&mut self, other: &mut Self) {
79 match self {
80 TracerPayloadCollection::V07(dest) => {
81 if let TracerPayloadCollection::V07(src) = other {
82 dest.append(src)
83 }
84 }
85 TracerPayloadCollection::V04(dest) => {
86 if let TracerPayloadCollection::V04(src) = other {
87 dest.append(src)
88 }
89 }
90 #[allow(clippy::unimplemented)]
92 TracerPayloadCollection::V05(_) => unimplemented!("Append for V05 not implemented"),
93 }
94 }
95
96 pub fn merge(&mut self) {
108 if let TracerPayloadCollection::V07(collection) = self {
109 collection.sort_unstable_by(cmp_send_data_payloads);
110 collection.dedup_by(|a, b| {
111 if cmp_send_data_payloads(a, b) == Ordering::Equal {
112 b.chunks.append(&mut a.chunks);
114 return true;
115 }
116 false
117 })
118 }
119 }
120
121 pub fn size(&self) -> usize {
136 match self {
137 TracerPayloadCollection::V07(collection) => {
138 collection.iter().map(|s| s.chunks.len()).sum()
139 }
140 TracerPayloadCollection::V04(collection) => collection.len(),
141 TracerPayloadCollection::V05((_, collection)) => collection.len(),
142 }
143 }
144}
145
146pub trait TraceChunkProcessor {
176 fn process(&mut self, chunk: &mut pb::TraceChunk, index: usize);
177}
178
179#[derive(Default)]
180pub struct DefaultTraceChunkProcessor;
184
185impl TraceChunkProcessor for DefaultTraceChunkProcessor {
186 fn process(&mut self, _chunk: &mut pb::TraceChunk, _index: usize) {
187 }
189}
190
191pub fn decode_to_trace_chunks(
223 data: libdd_tinybytes::Bytes,
224 encoding_type: TraceEncoding,
225) -> Result<(TraceChunks<BytesData>, usize), anyhow::Error> {
226 let (data, size) = match encoding_type {
227 TraceEncoding::V04 => msgpack_decoder::v04::from_bytes(data),
228 TraceEncoding::V05 => msgpack_decoder::v05::from_bytes(data),
229 }
230 .map_err(|e| anyhow::format_err!("Error deserializing trace from request body: {e}"))?;
231
232 Ok((
233 collect_trace_chunks(data, matches!(encoding_type, TraceEncoding::V05))?,
234 size,
235 ))
236}
237
238#[cfg(test)]
239mod tests {
240 use super::*;
241 use crate::span::v04::SpanBytes;
242 use crate::test_utils::create_test_no_alloc_span;
243 use libdd_tinybytes::BytesString;
244 use libdd_trace_protobuf::pb;
245 use serde_json::json;
246 use std::collections::HashMap;
247
248 fn create_dummy_collection_v07() -> TracerPayloadCollection {
249 TracerPayloadCollection::V07(vec![pb::TracerPayload {
250 container_id: "".to_string(),
251 language_name: "".to_string(),
252 language_version: "".to_string(),
253 tracer_version: "".to_string(),
254 runtime_id: "".to_string(),
255 chunks: vec![pb::TraceChunk {
256 priority: 0,
257 origin: "".to_string(),
258 spans: vec![],
259 tags: Default::default(),
260 dropped_trace: false,
261 }],
262 tags: Default::default(),
263 env: "".to_string(),
264 hostname: "".to_string(),
265 app_version: "".to_string(),
266 }])
267 }
268
269 fn create_trace() -> Vec<SpanBytes> {
270 vec![
271 create_test_no_alloc_span(1234, 12341, 0, 1, true),
273 create_test_no_alloc_span(1234, 12342, 12341, 1, false),
274 create_test_no_alloc_span(1234, 12343, 12342, 1, false),
275 ]
276 }
277
278 #[test]
279 fn test_append_traces_v07() {
280 let mut two_traces = create_dummy_collection_v07();
281 two_traces.append(&mut create_dummy_collection_v07());
282
283 let mut trace = create_dummy_collection_v07();
284
285 let mut empty = TracerPayloadCollection::V07(vec![]);
286
287 trace.append(&mut create_dummy_collection_v07());
288 assert_eq!(2, trace.size());
289
290 trace.append(&mut two_traces);
291 assert_eq!(4, trace.size());
292
293 trace.append(&mut empty);
294 assert_eq!(4, trace.size());
295 }
296
297 #[test]
298 fn test_append_traces_v04() {
299 fn create_trace() -> TracerPayloadCollection {
300 TracerPayloadCollection::V04(vec![vec![create_test_no_alloc_span(0, 1, 0, 2, true)]])
301 }
302
303 let mut two_traces = create_trace();
304 two_traces.append(&mut create_trace());
305
306 let mut trace = create_trace();
307
308 let mut empty = TracerPayloadCollection::V04(vec![]);
309
310 trace.append(&mut create_trace());
311 assert_eq!(2, trace.size());
312
313 trace.append(&mut two_traces);
314 assert_eq!(4, trace.size());
315
316 trace.append(&mut empty);
317 assert_eq!(4, trace.size());
318 }
319
320 #[test]
321 fn test_merge_traces() {
322 let mut trace = create_dummy_collection_v07();
323
324 trace.append(&mut create_dummy_collection_v07());
325 assert_eq!(2, trace.size());
326
327 trace.merge();
328 assert_eq!(2, trace.size());
329 if let TracerPayloadCollection::V07(collection) = trace {
330 assert_eq!(1, collection.len());
331 } else {
332 panic!("Unexpected type");
333 }
334 }
335
336 #[test]
337 fn test_try_into_success() {
338 let span_data1 = json!([{
339 "service": "test-service",
340 "name": "test-service-name",
341 "resource": "test-service-resource",
342 "trace_id": 111,
343 "span_id": 222,
344 "parent_id": 100,
345 "start": 1,
346 "duration": 5,
347 "error": 0,
348 "meta": {},
349 "metrics": {},
350 "type": "serverless",
351 }]);
352
353 let expected_serialized_span_data1 = vec![SpanBytes {
354 service: BytesString::from_slice("test-service".as_ref()).unwrap(),
355 name: BytesString::from_slice("test-service-name".as_ref()).unwrap(),
356 resource: BytesString::from_slice("test-service-resource".as_ref()).unwrap(),
357 trace_id: 111,
358 span_id: 222,
359 parent_id: 100,
360 start: 1,
361 duration: 5,
362 error: 0,
363 meta: HashMap::new(),
364 metrics: HashMap::new(),
365 meta_struct: HashMap::new(),
366 r#type: BytesString::from_slice("serverless".as_ref()).unwrap(),
367 span_links: vec![],
368 span_events: vec![],
369 }];
370
371 let span_data2 = json!([{
372 "service": "test-service",
373 "name": "test-service-name",
374 "resource": "test-service-resource",
375 "trace_id": 111,
376 "span_id": 333,
377 "parent_id": 100,
378 "start": 1,
379 "duration": 5,
380 "error": 1,
381 "meta": {},
382 "metrics": {},
383 "type": "",
384 }]);
385
386 let expected_serialized_span_data2 = vec![SpanBytes {
387 service: BytesString::from_slice("test-service".as_ref()).unwrap(),
388 name: BytesString::from_slice("test-service-name".as_ref()).unwrap(),
389 resource: BytesString::from_slice("test-service-resource".as_ref()).unwrap(),
390 trace_id: 111,
391 span_id: 333,
392 parent_id: 100,
393 start: 1,
394 duration: 5,
395 error: 1,
396 meta: HashMap::new(),
397 metrics: HashMap::new(),
398 meta_struct: HashMap::new(),
399 r#type: BytesString::default(),
400 span_links: vec![],
401 span_events: vec![],
402 }];
403
404 let data = rmp_serde::to_vec(&vec![span_data1, span_data2])
405 .expect("Failed to serialize test span.");
406 let data = libdd_tinybytes::Bytes::from(data);
407
408 let result = decode_to_trace_chunks(data, TraceEncoding::V04);
409
410 assert!(result.is_ok());
411
412 let (chunks, _) = result.unwrap();
413 assert_eq!(2, chunks.size());
414
415 if let TraceChunks::V04(traces) = chunks {
416 assert_eq!(expected_serialized_span_data1, traces[0]);
417 assert_eq!(expected_serialized_span_data2, traces[1]);
418 } else {
419 panic!("Invalid collection type returned for try_into");
420 }
421 }
422
423 #[cfg_attr(miri, ignore)]
424 #[test]
425 fn test_try_into_empty() {
426 let empty_data = vec![0x90];
427 let data = libdd_tinybytes::Bytes::from(empty_data);
428
429 let result = decode_to_trace_chunks(data, TraceEncoding::V04);
430
431 assert!(result.is_ok());
432
433 let (collection, _) = result.unwrap();
434 assert_eq!(0, collection.size());
435 }
436
437 #[test]
438 fn test_try_into_meta_metrics_success() {
439 let dummy_trace = create_trace();
440 let expected = vec![create_trace()];
441 let payload = rmp_serde::to_vec_named(&expected).unwrap();
442 let payload = libdd_tinybytes::Bytes::from(payload);
443
444 let result = decode_to_trace_chunks(payload, TraceEncoding::V04);
445
446 assert!(result.is_ok());
447
448 let (collection, _size) = result.unwrap();
449 assert_eq!(1, collection.size());
450 if let TraceChunks::V04(traces) = collection {
451 assert_eq!(dummy_trace, traces[0]);
452 } else {
453 panic!("Invalid collection type returned for try_into");
454 }
455 }
456}