mod span_v04;
use crate::span::v04::Span;
use crate::span::TraceData;
use crate::tracer_metadata::TracerMetadata;
use libdd_common::ResultInfallibleExt;
use rmp::encode::{
write_array_len, write_bin, write_map_len, write_sint, write_str, write_uint, write_uint8,
ByteBuf, RmpWrite, ValueWriteError,
};
use std::borrow::Borrow;
use std::collections::HashMap;
mod trace_key {
pub const LANGUAGE_NAME: u8 = 3;
pub const LANGUAGE_VERSION: u8 = 4;
pub const TRACER_VERSION: u8 = 5;
pub const RUNTIME_ID: u8 = 6;
pub const ENV_REF: u8 = 7;
pub const HOSTNAME_REF: u8 = 8;
pub const APP_VERSION_REF: u8 = 9;
pub const ATTRIBUTES: u8 = 10;
pub const CHUNKS: u8 = 11;
}
mod chunk_key {
pub const PRIORITY: u8 = 1;
pub const ORIGIN: u8 = 2;
pub const SPANS: u8 = 4;
pub const TRACE_ID: u8 = 6;
pub const SAMPLING_MECHANISM: u8 = 7;
}
pub(crate) struct StringTable {
seen: HashMap<String, u32>,
}
impl StringTable {
fn new() -> Self {
let mut seen = HashMap::new();
seen.insert(String::new(), 0);
Self { seen }
}
pub(crate) fn write_interned<W: RmpWrite, S: AsRef<str>>(
&mut self,
writer: &mut W,
s: S,
) -> Result<(), ValueWriteError<W::Error>> {
let s = s.as_ref();
if let Some(&id) = self.seen.get(s) {
write_uint(writer, id as u64)?;
} else {
let id = self.seen.len() as u32;
self.seen.insert(s.to_string(), id);
write_str(writer, s)?;
}
Ok(())
}
}
struct PayloadAttrs<'a> {
env: Option<&'a str>,
hostname: Option<&'a str>,
app_version: Option<&'a str>,
apm_mode: Option<&'a str>,
git_commit_sha: Option<&'a str>,
}
fn extract_payload_attrs<'a, T: TraceData + 'a, S: AsRef<[Span<T>]>>(
traces: &'a [S],
metadata: &'a TracerMetadata,
) -> PayloadAttrs<'a>
where
T::Text: 'a,
{
let mut env = (!metadata.env.is_empty()).then_some(metadata.env.as_str());
let mut hostname = (!metadata.hostname.is_empty()).then_some(metadata.hostname.as_str());
let mut app_version =
(!metadata.app_version.is_empty()).then_some(metadata.app_version.as_str());
let mut git_commit_sha =
(!metadata.git_commit_sha.is_empty()).then_some(metadata.git_commit_sha.as_str());
let mut apm_mode = None;
'outer: for trace in traces {
for span in trace.as_ref() {
if env.is_none() {
env = span.meta.get("env").map(|v| v.borrow());
}
if hostname.is_none() {
hostname = span.meta.get("_dd.hostname").map(|v| v.borrow());
}
if app_version.is_none() {
app_version = span.meta.get("version").map(|v| v.borrow());
}
if apm_mode.is_none() {
apm_mode = span.meta.get("_dd.apm_mode").map(|v| v.borrow());
}
if git_commit_sha.is_none() {
git_commit_sha = span.meta.get("_dd.git.commit.sha").map(|v| v.borrow());
}
if env.is_some()
&& hostname.is_some()
&& app_version.is_some()
&& apm_mode.is_some()
&& git_commit_sha.is_some()
{
break 'outer;
}
}
}
PayloadAttrs {
env,
hostname,
app_version,
apm_mode,
git_commit_sha,
}
}
struct ChunkAttrs<'a> {
trace_id: u128,
sampling_priority: Option<i32>,
origin: Option<&'a str>,
sampling_mechanism: Option<u32>,
}
fn extract_chunk_attrs<'a, T: TraceData>(spans: &'a [Span<T>]) -> ChunkAttrs<'a>
where
T::Text: 'a,
{
let trace_id = spans
.first()
.map(|s| {
let high = s
.meta
.get("_dd.p.tid")
.and_then(|v| u64::from_str_radix(v.borrow(), 16).ok())
.unwrap_or(0);
((high as u128) << 64) | s.trace_id
})
.unwrap_or(0);
let mut sampling_priority = None;
let mut origin = None;
let mut sampling_mechanism = None;
for span in spans {
let is_root =
span.parent_id == 0 || span.metrics.get("_dd.top_level").copied().unwrap_or(0.0) == 1.0;
if is_root {
sampling_priority = span.metrics.get("_sampling_priority_v1").map(|v| *v as i32);
origin = span.meta.get("_dd.origin").map(|v| v.borrow());
sampling_mechanism = span
.meta
.get("_dd.p.dm")
.and_then(|v| v.borrow().parse::<i32>().ok())
.map(|dm| dm.unsigned_abs());
break;
}
if sampling_priority.is_none() {
sampling_priority = span.metrics.get("_sampling_priority_v1").map(|v| *v as i32);
}
if origin.is_none() {
origin = span.meta.get("_dd.origin").map(|v| v.borrow());
}
if sampling_mechanism.is_none() {
sampling_mechanism = span
.meta
.get("_dd.p.dm")
.and_then(|v| v.borrow().parse::<i32>().ok())
.map(|dm| dm.unsigned_abs());
}
}
ChunkAttrs {
trace_id,
sampling_priority,
origin,
sampling_mechanism,
}
}
fn encode_payload<W: RmpWrite, T: TraceData, S: AsRef<[Span<T>]>>(
writer: &mut W,
traces: &[S],
metadata: &TracerMetadata,
) -> Result<(), ValueWriteError<W::Error>> {
let mut table = StringTable::new();
let payload_attrs = extract_payload_attrs(traces, metadata);
let attr_count =
payload_attrs.apm_mode.is_some() as u32 + payload_attrs.git_commit_sha.is_some() as u32;
let has_attributes = attr_count > 0;
let map_len = 1u32 + (!metadata.language.is_empty()) as u32
+ (!metadata.language_version.is_empty()) as u32
+ (!metadata.tracer_version.is_empty()) as u32
+ (!metadata.runtime_id.is_empty()) as u32
+ payload_attrs.env.is_some() as u32
+ payload_attrs.hostname.is_some() as u32
+ payload_attrs.app_version.is_some() as u32
+ has_attributes as u32;
write_map_len(writer, map_len)?;
if !metadata.language.is_empty() {
write_uint8(writer, trace_key::LANGUAGE_NAME)?;
table.write_interned(writer, &metadata.language)?;
}
if !metadata.language_version.is_empty() {
write_uint8(writer, trace_key::LANGUAGE_VERSION)?;
table.write_interned(writer, &metadata.language_version)?;
}
if !metadata.tracer_version.is_empty() {
write_uint8(writer, trace_key::TRACER_VERSION)?;
table.write_interned(writer, &metadata.tracer_version)?;
}
if !metadata.runtime_id.is_empty() {
write_uint8(writer, trace_key::RUNTIME_ID)?;
table.write_interned(writer, &metadata.runtime_id)?;
}
if let Some(env) = payload_attrs.env {
write_uint8(writer, trace_key::ENV_REF)?;
table.write_interned(writer, env)?;
}
if let Some(hostname) = payload_attrs.hostname {
write_uint8(writer, trace_key::HOSTNAME_REF)?;
table.write_interned(writer, hostname)?;
}
if let Some(app_version) = payload_attrs.app_version {
write_uint8(writer, trace_key::APP_VERSION_REF)?;
table.write_interned(writer, app_version)?;
}
if has_attributes {
write_uint8(writer, trace_key::ATTRIBUTES)?;
write_array_len(writer, attr_count * 3)?;
if let Some(v) = payload_attrs.apm_mode {
table.write_interned(writer, "_dd.apm_mode")?;
write_uint8(writer, span_v04::AnyValueKey::String as u8)?;
table.write_interned(writer, v)?;
}
if let Some(v) = payload_attrs.git_commit_sha {
table.write_interned(writer, "_dd.git.commit.sha")?;
write_uint8(writer, span_v04::AnyValueKey::String as u8)?;
table.write_interned(writer, v)?;
}
}
write_uint8(writer, trace_key::CHUNKS)?;
write_array_len(writer, traces.len() as u32)?;
for trace in traces {
encode_chunk(writer, trace.as_ref(), &mut table)?;
}
Ok(())
}
fn encode_chunk<W: RmpWrite, T: TraceData>(
writer: &mut W,
spans: &[Span<T>],
table: &mut StringTable,
) -> Result<(), ValueWriteError<W::Error>> {
let attrs = extract_chunk_attrs(spans);
let fields = 2u32 + attrs.origin.is_some() as u32
+ attrs.sampling_priority.is_some() as u32
+ attrs.sampling_mechanism.is_some() as u32;
write_map_len(writer, fields)?;
write_uint8(writer, chunk_key::TRACE_ID)?;
write_bin(writer, &attrs.trace_id.to_be_bytes())?;
if let Some(origin) = attrs.origin {
write_uint8(writer, chunk_key::ORIGIN)?;
table.write_interned(writer, origin)?;
}
if let Some(priority) = attrs.sampling_priority {
write_uint8(writer, chunk_key::PRIORITY)?;
write_sint(writer, priority as i64)?;
}
if let Some(mechanism) = attrs.sampling_mechanism {
write_uint8(writer, chunk_key::SAMPLING_MECHANISM)?;
write_uint(writer, mechanism as u64)?;
}
write_uint8(writer, chunk_key::SPANS)?;
write_array_len(writer, spans.len() as u32)?;
for span in spans {
span_v04::encode_span(writer, span, table)?;
}
Ok(())
}
pub fn write_to_slice<T: TraceData, S: AsRef<[Span<T>]>>(
slice: &mut &mut [u8],
traces: &[S],
metadata: &TracerMetadata,
) -> Result<(), ValueWriteError> {
encode_payload(slice, traces, metadata)
}
pub fn to_vec<T: TraceData, S: AsRef<[Span<T>]>>(
traces: &[S],
metadata: &TracerMetadata,
) -> Vec<u8> {
to_vec_with_capacity(traces, 0, metadata)
}
pub fn to_vec_with_capacity<T: TraceData, S: AsRef<[Span<T>]>>(
traces: &[S],
capacity: u32,
metadata: &TracerMetadata,
) -> Vec<u8> {
let mut buf = ByteBuf::with_capacity(capacity as usize);
encode_payload(&mut buf, traces, metadata)
.map_err(super::flatten_value_write_infallible)
.unwrap_infallible();
buf.into_vec()
}
pub fn to_encoded_byte_len<T: TraceData, S: AsRef<[Span<T>]>>(
traces: &[S],
metadata: &TracerMetadata,
) -> u32 {
let mut counter = super::CountLength(0);
let _ = encode_payload(&mut counter, traces, metadata);
counter.0
}
#[cfg(test)]
mod tests {
use super::*;
use crate::span::v04::SpanBytes;
use libdd_tinybytes::BytesString;
use std::collections::HashMap;
fn make_span(
service: &str,
name: &str,
trace_id: u128,
span_id: u64,
parent_id: u64,
) -> SpanBytes {
SpanBytes {
service: BytesString::from_slice(service.as_bytes()).unwrap(),
name: BytesString::from_slice(name.as_bytes()).unwrap(),
resource: BytesString::from_slice(b"res").unwrap(),
trace_id,
span_id,
parent_id,
start: 1_000_000,
duration: 500,
..Default::default()
}
}
#[test]
fn test_to_vec_non_empty() {
let spans = vec![make_span("svc", "op", 42, 1, 0)];
let traces = vec![spans];
let encoded = to_vec(&traces, &TracerMetadata::default());
assert!(!encoded.is_empty());
}
#[test]
fn test_to_vec_empty_traces() {
let traces: Vec<Vec<SpanBytes>> = vec![];
let encoded = to_vec(&traces, &TracerMetadata::default());
assert!(!encoded.is_empty());
}
#[test]
fn test_string_interning_reduces_size() {
let s1 = make_span("my-service", "op1", 1, 1, 0);
let s2 = make_span("my-service", "op2", 2, 2, 0);
let traces_two = vec![vec![s1], vec![s2]];
let s_single = make_span("my-service", "op1", 1, 1, 0);
let traces_single = vec![vec![s_single]];
let encoded_two = to_vec(&traces_two, &TracerMetadata::default());
let encoded_single = to_vec(&traces_single, &TracerMetadata::default());
assert!(
encoded_two.len() < 2 * encoded_single.len(),
"Interning should reduce size: two={} single={}",
encoded_two.len(),
encoded_single.len()
);
}
#[test]
fn test_chunk_level_attrs_origin_and_priority() {
let mut meta = HashMap::new();
meta.insert(
BytesString::from_static("_dd.origin"),
BytesString::from_static("lambda"),
);
let mut metrics = HashMap::new();
metrics.insert(BytesString::from_static("_sampling_priority_v1"), 1.0f64);
let root = SpanBytes {
service: BytesString::from_slice(b"svc").unwrap(),
name: BytesString::from_slice(b"op").unwrap(),
resource: BytesString::from_slice(b"res").unwrap(),
trace_id: 99,
span_id: 1,
parent_id: 0,
start: 1000,
duration: 100,
meta,
metrics,
..Default::default()
};
let encoded = to_vec(&[vec![root]], &TracerMetadata::default());
assert!(!encoded.is_empty());
let lambda_bytes = b"lambda";
assert!(
encoded
.windows(lambda_bytes.len())
.any(|w| w == lambda_bytes),
"origin 'lambda' should appear in payload"
);
}
#[test]
fn test_to_encoded_byte_len_matches_to_vec() {
let spans = vec![
make_span("svc", "op", 1, 1, 0),
make_span("svc", "child", 1, 2, 1),
];
let traces = vec![spans];
let meta = TracerMetadata::default();
let encoded = to_vec(&traces, &meta);
let len = to_encoded_byte_len(&traces, &meta);
assert_eq!(encoded.len() as u32, len);
}
#[test]
fn test_remote_parent_root_span_top_level() {
let mut metrics = HashMap::new();
metrics.insert(BytesString::from_static("_dd.top_level"), 1.0f64);
metrics.insert(BytesString::from_static("_sampling_priority_v1"), 2.0f64);
let root = SpanBytes {
service: BytesString::from_slice(b"svc").unwrap(),
name: BytesString::from_slice(b"op").unwrap(),
resource: BytesString::from_slice(b"res").unwrap(),
trace_id: 123,
span_id: 42,
parent_id: 999, start: 1000,
duration: 100,
metrics,
..Default::default()
};
let encoded = to_vec(&[vec![root]], &TracerMetadata::default());
assert!(!encoded.is_empty());
}
#[test]
fn test_payload_promoted_fields() {
let mut meta = HashMap::new();
meta.insert(
BytesString::from_static("env"),
BytesString::from_static("prod"),
);
meta.insert(
BytesString::from_static("version"),
BytesString::from_static("1.2.3"),
);
meta.insert(
BytesString::from_static("_dd.hostname"),
BytesString::from_static("my-host"),
);
let span = SpanBytes {
service: BytesString::from_slice(b"svc").unwrap(),
name: BytesString::from_slice(b"op").unwrap(),
resource: BytesString::from_slice(b"res").unwrap(),
trace_id: 1,
span_id: 1,
parent_id: 0,
start: 1000,
duration: 100,
meta,
..Default::default()
};
let encoded = to_vec(&[vec![span]], &TracerMetadata::default());
let prod_bytes = b"prod";
assert!(
encoded.windows(prod_bytes.len()).any(|w| w == prod_bytes),
"env 'prod' should appear in payload"
);
let host_bytes = b"my-host";
assert!(
encoded.windows(host_bytes.len()).any(|w| w == host_bytes),
"hostname 'my-host' should appear in payload"
);
}
#[test]
fn test_payload_attributes_apm_mode_and_git_commit_sha() {
let mut meta = HashMap::new();
meta.insert(
BytesString::from_static("_dd.apm_mode"),
BytesString::from_static("ssi"),
);
meta.insert(
BytesString::from_static("_dd.git.commit.sha"),
BytesString::from_static("abc123"),
);
let span = SpanBytes {
service: BytesString::from_slice(b"svc").unwrap(),
name: BytesString::from_slice(b"op").unwrap(),
resource: BytesString::from_slice(b"res").unwrap(),
trace_id: 1,
span_id: 1,
parent_id: 0,
start: 1000,
duration: 100,
meta,
..Default::default()
};
let encoded = to_vec(&[vec![span]], &TracerMetadata::default());
let ssi_bytes = b"ssi";
assert!(
encoded.windows(ssi_bytes.len()).any(|w| w == ssi_bytes),
"apm_mode 'ssi' should appear in payload"
);
let sha_bytes = b"abc123";
assert!(
encoded.windows(sha_bytes.len()).any(|w| w == sha_bytes),
"git commit sha 'abc123' should appear in payload"
);
let apm_key = b"_dd.apm_mode";
assert!(
encoded.windows(apm_key.len()).any(|w| w == apm_key),
"_dd.apm_mode key should appear in payload"
);
let git_key = b"_dd.git.commit.sha";
assert!(
encoded.windows(git_key.len()).any(|w| w == git_key),
"_dd.git.commit.sha key should appear in payload"
);
}
#[test]
fn test_payload_attributes_absent_when_no_relevant_tags() {
let span = make_span("svc", "op", 1, 1, 0);
let encoded = to_vec(&[vec![span]], &TracerMetadata::default());
let apm_key = b"_dd.apm_mode";
assert!(
!encoded.windows(apm_key.len()).any(|w| w == apm_key),
"key 10 should be absent when no relevant tags are set"
);
}
#[test]
fn test_payload_metadata_fields_present() {
let span = make_span("svc", "op", 1, 1, 0);
let metadata = TracerMetadata {
language: "python".to_string(),
language_version: "3.11".to_string(),
tracer_version: "2.0.0".to_string(),
runtime_id: "abc-123-uuid".to_string(),
..Default::default()
};
let encoded = to_vec(&[vec![span]], &metadata);
for s in &[b"python" as &[u8], b"3.11", b"2.0.0", b"abc-123-uuid"] {
assert!(
encoded.windows(s.len()).any(|w| w == *s),
"{} should appear in payload",
std::str::from_utf8(s).unwrap()
);
}
}
#[test]
fn test_payload_metadata_absent_when_empty() {
let span = make_span("svc", "op", 1, 1, 0);
let encoded_with = to_vec(
&[vec![span.clone()]],
&TracerMetadata {
language: "go".to_string(),
..Default::default()
},
);
let encoded_without = to_vec(&[vec![span]], &TracerMetadata::default());
assert!(encoded_with.len() > encoded_without.len());
}
#[test]
fn test_128bit_trace_id_from_dd_p_tid() {
let mut meta = HashMap::new();
meta.insert(
BytesString::from_static("_dd.p.tid"),
BytesString::from_static("640cfd5400000000"),
);
let span = SpanBytes {
service: BytesString::from_slice(b"svc").unwrap(),
name: BytesString::from_slice(b"op").unwrap(),
resource: BytesString::from_slice(b"res").unwrap(),
trace_id: 0x0123456789abcdef,
span_id: 1,
parent_id: 0,
start: 1000,
duration: 100,
meta,
..Default::default()
};
let encoded = to_vec(&[vec![span]], &TracerMetadata::default());
let expected = [
0x64, 0x0c, 0xfd, 0x54, 0x00, 0x00, 0x00, 0x00, 0x01, 0x23, 0x45, 0x67, 0x89, 0xab,
0xcd, 0xef,
];
assert!(
encoded.windows(16).any(|w| w == expected),
"128-bit trace_id big-endian bytes should appear in payload"
);
let tid_key = b"_dd.p.tid";
assert!(
!encoded.windows(tid_key.len()).any(|w| w == tid_key),
"_dd.p.tid should be consumed, not encoded as a span attribute"
);
}
#[test]
fn test_128bit_trace_id_without_dd_p_tid() {
let span = make_span("svc", "op", 0x0123456789abcdef, 1, 0);
let encoded = to_vec(&[vec![span]], &TracerMetadata::default());
let expected = [
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x23, 0x45, 0x67, 0x89, 0xab,
0xcd, 0xef,
];
assert!(
encoded.windows(16).any(|w| w == expected),
"absent _dd.p.tid should yield zero high 64 bits"
);
}
#[test]
fn test_sampling_mechanism_negative_value() {
let mut meta = HashMap::new();
meta.insert(
BytesString::from_static("_dd.p.dm"),
BytesString::from_static("-4"),
);
let root = SpanBytes {
service: BytesString::from_slice(b"svc").unwrap(),
name: BytesString::from_slice(b"op").unwrap(),
resource: BytesString::from_slice(b"res").unwrap(),
trace_id: 1,
span_id: 1,
parent_id: 0,
start: 1000,
duration: 100,
meta,
..Default::default()
};
let encoded = to_vec(&[vec![root]], &TracerMetadata::default());
let expected = [chunk_key::SAMPLING_MECHANISM, 0x04];
assert!(
encoded.windows(2).any(|w| w == expected),
"sampling_mechanism should be encoded as unsigned_abs(\"-4\") = 4"
);
}
#[test]
fn test_chunk_attrs_fallback_no_root_span() {
let mut meta1 = HashMap::new();
meta1.insert(
BytesString::from_static("_dd.origin"),
BytesString::from_static("lambda"),
);
let mut metrics2 = HashMap::new();
metrics2.insert(BytesString::from_static("_sampling_priority_v1"), 2.0f64);
let mut meta3 = HashMap::new();
meta3.insert(
BytesString::from_static("_dd.p.dm"),
BytesString::from_static("-3"),
);
let s1 = SpanBytes {
service: BytesString::from_slice(b"svc").unwrap(),
name: BytesString::from_slice(b"op1").unwrap(),
resource: BytesString::from_slice(b"res").unwrap(),
trace_id: 1,
span_id: 11,
parent_id: 10, start: 1000,
duration: 100,
meta: meta1,
..Default::default()
};
let s2 = SpanBytes {
service: BytesString::from_slice(b"svc").unwrap(),
name: BytesString::from_slice(b"op2").unwrap(),
resource: BytesString::from_slice(b"res").unwrap(),
trace_id: 1,
span_id: 12,
parent_id: 11,
start: 1000,
duration: 100,
metrics: metrics2,
..Default::default()
};
let s3 = SpanBytes {
service: BytesString::from_slice(b"svc").unwrap(),
name: BytesString::from_slice(b"op3").unwrap(),
resource: BytesString::from_slice(b"res").unwrap(),
trace_id: 1,
span_id: 13,
parent_id: 12,
start: 1000,
duration: 100,
meta: meta3,
..Default::default()
};
let encoded = to_vec(&[vec![s1, s2, s3]], &TracerMetadata::default());
let lambda = b"lambda";
assert!(
encoded.windows(lambda.len()).any(|w| w == lambda),
"origin 'lambda' from span 1 should appear in payload"
);
let prio = [chunk_key::PRIORITY, 0x02];
assert!(
encoded.windows(2).any(|w| w == prio),
"sampling_priority 2 from span 2 should appear"
);
let mech = [chunk_key::SAMPLING_MECHANISM, 0x03];
assert!(
encoded.windows(2).any(|w| w == mech),
"sampling_mechanism 3 from span 3 should appear"
);
}
}