use crate::span::v05::dict::SharedDict;
use crate::span::{v04, v05, BytesData, SharedDictBytes, TraceData};
use crate::trace_utils::collect_trace_chunks;
use crate::{msgpack_decoder, trace_utils::cmp_send_data_payloads};
use libdd_trace_protobuf::pb;
use std::cmp::Ordering;
use std::iter::Iterator;
pub type TracerPayloadV04 = Vec<v04::SpanBytes>;
pub type TracerPayloadV05 = Vec<v05::Span>;
#[derive(Debug, Clone, Copy)]
pub enum TraceEncoding {
V04,
V05,
}
#[derive(Debug)]
pub enum TraceChunks<T: TraceData> {
V04(Vec<Vec<v04::Span<T>>>),
V05((SharedDict<T::Text>, Vec<Vec<v05::Span>>)),
V1(Vec<Vec<v04::Span<T>>>),
}
impl TraceChunks<BytesData> {
pub fn into_tracer_payload_collection(self) -> TracerPayloadCollection {
match self {
TraceChunks::V04(traces) => TracerPayloadCollection::V04(traces),
TraceChunks::V05(traces) => TracerPayloadCollection::V05(traces),
TraceChunks::V1(traces) => TracerPayloadCollection::V04(traces),
}
}
}
impl<T: TraceData> TraceChunks<T> {
pub fn size(&self) -> usize {
match self {
TraceChunks::V04(traces) => traces.len(),
TraceChunks::V05((_, traces)) => traces.len(),
TraceChunks::V1(traces) => traces.len(),
}
}
}
#[derive(Debug)]
pub enum TracerPayloadCollection {
V07(Vec<pb::TracerPayload>),
V04(Vec<Vec<v04::SpanBytes>>),
V05((SharedDictBytes, Vec<Vec<v05::Span>>)),
}
impl TracerPayloadCollection {
pub fn append(&mut self, other: &mut Self) {
match self {
TracerPayloadCollection::V07(dest) => {
if let TracerPayloadCollection::V07(src) = other {
dest.append(src)
}
}
TracerPayloadCollection::V04(dest) => {
if let TracerPayloadCollection::V04(src) = other {
dest.append(src)
}
}
#[allow(clippy::unimplemented)]
TracerPayloadCollection::V05(_) => unimplemented!("Append for V05 not implemented"),
}
}
pub fn merge(&mut self) {
if let TracerPayloadCollection::V07(collection) = self {
collection.sort_unstable_by(cmp_send_data_payloads);
collection.dedup_by(|a, b| {
if cmp_send_data_payloads(a, b) == Ordering::Equal {
b.chunks.append(&mut a.chunks);
return true;
}
false
})
}
}
pub fn size(&self) -> usize {
match self {
TracerPayloadCollection::V07(collection) => {
collection.iter().map(|s| s.chunks.len()).sum()
}
TracerPayloadCollection::V04(collection) => collection.len(),
TracerPayloadCollection::V05((_, collection)) => collection.len(),
}
}
}
pub trait TraceChunkProcessor {
fn process(&mut self, chunk: &mut pb::TraceChunk, index: usize);
}
#[derive(Default)]
pub struct DefaultTraceChunkProcessor;
impl TraceChunkProcessor for DefaultTraceChunkProcessor {
fn process(&mut self, _chunk: &mut pb::TraceChunk, _index: usize) {
}
}
pub fn decode_to_trace_chunks(
data: libdd_tinybytes::Bytes,
encoding_type: TraceEncoding,
) -> Result<(TraceChunks<BytesData>, usize), anyhow::Error> {
let (data, size) = match encoding_type {
TraceEncoding::V04 => msgpack_decoder::v04::from_bytes(data),
TraceEncoding::V05 => msgpack_decoder::v05::from_bytes(data),
}
.map_err(|e| anyhow::format_err!("Error deserializing trace from request body: {e}"))?;
Ok((collect_trace_chunks(data, encoding_type)?, size))
}
#[cfg(test)]
mod tests {
use super::*;
use crate::span::v04::SpanBytes;
use crate::test_utils::create_test_no_alloc_span;
use libdd_tinybytes::BytesString;
use libdd_trace_protobuf::pb;
use serde_json::json;
use std::collections::HashMap;
fn create_dummy_collection_v07() -> TracerPayloadCollection {
TracerPayloadCollection::V07(vec![pb::TracerPayload {
container_id: "".to_string(),
language_name: "".to_string(),
language_version: "".to_string(),
tracer_version: "".to_string(),
runtime_id: "".to_string(),
chunks: vec![pb::TraceChunk {
priority: 0,
origin: "".to_string(),
spans: vec![],
tags: Default::default(),
dropped_trace: false,
}],
tags: Default::default(),
env: "".to_string(),
hostname: "".to_string(),
app_version: "".to_string(),
}])
}
fn create_trace() -> Vec<SpanBytes> {
vec![
create_test_no_alloc_span(1234, 12341, 0, 1, true),
create_test_no_alloc_span(1234, 12342, 12341, 1, false),
create_test_no_alloc_span(1234, 12343, 12342, 1, false),
]
}
#[test]
fn test_append_traces_v07() {
let mut two_traces = create_dummy_collection_v07();
two_traces.append(&mut create_dummy_collection_v07());
let mut trace = create_dummy_collection_v07();
let mut empty = TracerPayloadCollection::V07(vec![]);
trace.append(&mut create_dummy_collection_v07());
assert_eq!(2, trace.size());
trace.append(&mut two_traces);
assert_eq!(4, trace.size());
trace.append(&mut empty);
assert_eq!(4, trace.size());
}
#[test]
fn test_append_traces_v04() {
fn create_trace() -> TracerPayloadCollection {
TracerPayloadCollection::V04(vec![vec![create_test_no_alloc_span(0, 1, 0, 2, true)]])
}
let mut two_traces = create_trace();
two_traces.append(&mut create_trace());
let mut trace = create_trace();
let mut empty = TracerPayloadCollection::V04(vec![]);
trace.append(&mut create_trace());
assert_eq!(2, trace.size());
trace.append(&mut two_traces);
assert_eq!(4, trace.size());
trace.append(&mut empty);
assert_eq!(4, trace.size());
}
#[test]
fn test_merge_traces() {
let mut trace = create_dummy_collection_v07();
trace.append(&mut create_dummy_collection_v07());
assert_eq!(2, trace.size());
trace.merge();
assert_eq!(2, trace.size());
if let TracerPayloadCollection::V07(collection) = trace {
assert_eq!(1, collection.len());
} else {
panic!("Unexpected type");
}
}
#[test]
fn test_try_into_success() {
let span_data1 = json!([{
"service": "test-service",
"name": "test-service-name",
"resource": "test-service-resource",
"trace_id": 111,
"span_id": 222,
"parent_id": 100,
"start": 1,
"duration": 5,
"error": 0,
"meta": {},
"metrics": {},
"type": "serverless",
}]);
let expected_serialized_span_data1 = vec![SpanBytes {
service: BytesString::from_slice("test-service".as_ref()).unwrap(),
name: BytesString::from_slice("test-service-name".as_ref()).unwrap(),
resource: BytesString::from_slice("test-service-resource".as_ref()).unwrap(),
trace_id: 111,
span_id: 222,
parent_id: 100,
start: 1,
duration: 5,
error: 0,
meta: HashMap::new(),
metrics: HashMap::new(),
meta_struct: HashMap::new(),
r#type: BytesString::from_slice("serverless".as_ref()).unwrap(),
span_links: vec![],
span_events: vec![],
}];
let span_data2 = json!([{
"service": "test-service",
"name": "test-service-name",
"resource": "test-service-resource",
"trace_id": 111,
"span_id": 333,
"parent_id": 100,
"start": 1,
"duration": 5,
"error": 1,
"meta": {},
"metrics": {},
"type": "",
}]);
let expected_serialized_span_data2 = vec![SpanBytes {
service: BytesString::from_slice("test-service".as_ref()).unwrap(),
name: BytesString::from_slice("test-service-name".as_ref()).unwrap(),
resource: BytesString::from_slice("test-service-resource".as_ref()).unwrap(),
trace_id: 111,
span_id: 333,
parent_id: 100,
start: 1,
duration: 5,
error: 1,
meta: HashMap::new(),
metrics: HashMap::new(),
meta_struct: HashMap::new(),
r#type: BytesString::default(),
span_links: vec![],
span_events: vec![],
}];
let data = rmp_serde::to_vec(&vec![span_data1, span_data2])
.expect("Failed to serialize test span.");
let data = libdd_tinybytes::Bytes::from(data);
let result = decode_to_trace_chunks(data, TraceEncoding::V04);
assert!(result.is_ok());
let (chunks, _) = result.unwrap();
assert_eq!(2, chunks.size());
if let TraceChunks::V04(traces) = chunks {
assert_eq!(expected_serialized_span_data1, traces[0]);
assert_eq!(expected_serialized_span_data2, traces[1]);
} else {
panic!("Invalid collection type returned for try_into");
}
}
#[cfg_attr(miri, ignore)]
#[test]
fn test_try_into_empty() {
let empty_data = vec![0x90];
let data = libdd_tinybytes::Bytes::from(empty_data);
let result = decode_to_trace_chunks(data, TraceEncoding::V04);
assert!(result.is_ok());
let (collection, _) = result.unwrap();
assert_eq!(0, collection.size());
}
#[test]
fn test_try_into_meta_metrics_success() {
let dummy_trace = create_trace();
let expected = vec![create_trace()];
let payload = rmp_serde::to_vec_named(&expected).unwrap();
let payload = libdd_tinybytes::Bytes::from(payload);
let result = decode_to_trace_chunks(payload, TraceEncoding::V04);
assert!(result.is_ok());
let (collection, _size) = result.unwrap();
assert_eq!(1, collection.size());
if let TraceChunks::V04(traces) = collection {
assert_eq!(dummy_trace, traces[0]);
} else {
panic!("Invalid collection type returned for try_into");
}
}
}