Skip to main content

libdd_trace_utils/msgpack_encoder/v1/
mod.rs

1// Copyright 2026-Present Datadog, Inc. https://www.datadoghq.com/
2// SPDX-License-Identifier: Apache-2.0
3
4mod span_v04;
5
6use crate::span::v04::Span;
7use crate::span::TraceData;
8use crate::tracer_metadata::TracerMetadata;
9use libdd_common::ResultInfallibleExt;
10use rmp::encode::{
11    write_array_len, write_bin, write_map_len, write_sint, write_str, write_uint, write_uint8,
12    ByteBuf, RmpWrite, ValueWriteError,
13};
14use std::borrow::Borrow;
15use std::collections::HashMap;
16
17/// Integer keys for the top-level V1 trace payload map.
18mod trace_key {
19    pub const LANGUAGE_NAME: u8 = 3;
20    pub const LANGUAGE_VERSION: u8 = 4;
21    pub const TRACER_VERSION: u8 = 5;
22    pub const RUNTIME_ID: u8 = 6;
23    pub const ENV_REF: u8 = 7;
24    pub const HOSTNAME_REF: u8 = 8;
25    pub const APP_VERSION_REF: u8 = 9;
26    /// Payload-level attributes map (e.g. `_dd.apm_mode`, `_dd.git.commit.sha`).
27    pub const ATTRIBUTES: u8 = 10;
28    pub const CHUNKS: u8 = 11;
29}
30
31/// Integer keys for V1 chunk-level fields.
32mod chunk_key {
33    pub const PRIORITY: u8 = 1;
34    pub const ORIGIN: u8 = 2;
35    pub const SPANS: u8 = 4;
36    pub const TRACE_ID: u8 = 6;
37    /// Sampling mechanism (previously the `_dd.p.dm` span tag).
38    pub const SAMPLING_MECHANISM: u8 = 7;
39}
40
41/// Streaming string intern table.
42///
43/// The first time a string is written, it is emitted as a msgpack `str` and assigned an
44/// incrementing integer ID. On subsequent occurrences only the ID is emitted as a msgpack `uint`.
45/// ID 0 is reserved for the empty string (pre-inserted in the constructor).
46///
47/// The string table is scoped per payload: each `to_vec` / `write_to_slice` call starts with a
48/// fresh table so deduplication is payload-local.
49pub(crate) struct StringTable {
50    seen: HashMap<String, u32>,
51}
52
53impl StringTable {
54    fn new() -> Self {
55        let mut seen = HashMap::new();
56        seen.insert(String::new(), 0);
57        Self { seen }
58    }
59
60    /// Writes `s` to `writer` using string interning.
61    ///
62    /// - First occurrence of `s` → msgpack `str`, ID recorded for future references
63    /// - Subsequent occurrence → msgpack `uint` carrying the previously assigned ID
64    pub(crate) fn write_interned<W: RmpWrite, S: AsRef<str>>(
65        &mut self,
66        writer: &mut W,
67        s: S,
68    ) -> Result<(), ValueWriteError<W::Error>> {
69        let s = s.as_ref();
70        if let Some(&id) = self.seen.get(s) {
71            write_uint(writer, id as u64)?;
72        } else {
73            let id = self.seen.len() as u32;
74            self.seen.insert(s.to_string(), id);
75            write_str(writer, s)?;
76        }
77        Ok(())
78    }
79}
80
81/// Promoted fields extracted from the payload's spans, written at the top-level map.
82struct PayloadAttrs<'a> {
83    env: Option<&'a str>,
84    hostname: Option<&'a str>,
85    app_version: Option<&'a str>,
86    /// `_dd.apm_mode` span tag, promoted to payload-level attributes.
87    apm_mode: Option<&'a str>,
88    /// `_dd.git.commit.sha` span tag, promoted to payload-level attributes.
89    git_commit_sha: Option<&'a str>,
90}
91
92fn extract_payload_attrs<'a, T: TraceData + 'a, S: AsRef<[Span<T>]>>(
93    traces: &'a [S],
94    metadata: &'a TracerMetadata,
95) -> PayloadAttrs<'a>
96where
97    T::Text: 'a,
98{
99    // Prefer TracerMetadata (set once on the builder) over span scanning. Fall back to
100    // span meta only when the builder-level value is missing — e.g. v04 payloads where
101    // the SDK propagated these as span tags.
102    let mut env = (!metadata.env.is_empty()).then_some(metadata.env.as_str());
103    let mut hostname = (!metadata.hostname.is_empty()).then_some(metadata.hostname.as_str());
104    let mut app_version =
105        (!metadata.app_version.is_empty()).then_some(metadata.app_version.as_str());
106    let mut git_commit_sha =
107        (!metadata.git_commit_sha.is_empty()).then_some(metadata.git_commit_sha.as_str());
108    let mut apm_mode = None;
109
110    'outer: for trace in traces {
111        for span in trace.as_ref() {
112            if env.is_none() {
113                env = span.meta.get("env").map(|v| v.borrow());
114            }
115            if hostname.is_none() {
116                hostname = span.meta.get("_dd.hostname").map(|v| v.borrow());
117            }
118            if app_version.is_none() {
119                app_version = span.meta.get("version").map(|v| v.borrow());
120            }
121            if apm_mode.is_none() {
122                apm_mode = span.meta.get("_dd.apm_mode").map(|v| v.borrow());
123            }
124            if git_commit_sha.is_none() {
125                git_commit_sha = span.meta.get("_dd.git.commit.sha").map(|v| v.borrow());
126            }
127            if env.is_some()
128                && hostname.is_some()
129                && app_version.is_some()
130                && apm_mode.is_some()
131                && git_commit_sha.is_some()
132            {
133                break 'outer;
134            }
135        }
136    }
137
138    PayloadAttrs {
139        env,
140        hostname,
141        app_version,
142        apm_mode,
143        git_commit_sha,
144    }
145}
146
147/// Promoted fields extracted from spans and written at the chunk level.
148struct ChunkAttrs<'a> {
149    /// Full 128-bit trace ID (encodes as 16-byte big-endian binary).
150    trace_id: u128,
151    /// Sampling priority from `_sampling_priority_v1` metric on the root span.
152    sampling_priority: Option<i32>,
153    /// Origin tag from `_dd.origin` meta on the root span.
154    origin: Option<&'a str>,
155    /// Sampling mechanism from `_dd.p.dm` meta on the root span.
156    sampling_mechanism: Option<u32>,
157}
158
159fn extract_chunk_attrs<'a, T: TraceData>(spans: &'a [Span<T>]) -> ChunkAttrs<'a>
160where
161    T::Text: 'a,
162{
163    // trace_id is invariant per chunk. The v04 wire format carries only the low 64 bits;
164    // the high 64 bits are propagated as the hex string meta tag "_dd.p.tid".
165    let trace_id = spans
166        .first()
167        .map(|s| {
168            let high = s
169                .meta
170                .get("_dd.p.tid")
171                .and_then(|v| u64::from_str_radix(v.borrow(), 16).ok())
172                .unwrap_or(0);
173            ((high as u128) << 64) | s.trace_id
174        })
175        .unwrap_or(0);
176
177    let mut sampling_priority = None;
178    let mut origin = None;
179    let mut sampling_mechanism = None;
180
181    for span in spans {
182        // Root span: either no parent in this chunk, or tagged _dd.top_level=1 (remote parent).
183        let is_root =
184            span.parent_id == 0 || span.metrics.get("_dd.top_level").copied().unwrap_or(0.0) == 1.0;
185
186        if is_root {
187            // Root span is authoritative: its values supersede any non-root fallback,
188            // including absence (a field missing on the root should not be filled from non-roots).
189            sampling_priority = span.metrics.get("_sampling_priority_v1").map(|v| *v as i32);
190            origin = span.meta.get("_dd.origin").map(|v| v.borrow());
191            // _dd.p.dm is a signed integer stored as a string; unsigned_abs preserves the
192            // magnitude.
193            sampling_mechanism = span
194                .meta
195                .get("_dd.p.dm")
196                .and_then(|v| v.borrow().parse::<i32>().ok())
197                .map(|dm| dm.unsigned_abs());
198            break;
199        }
200
201        // No root found yet — accumulate fallback values from non-root spans (partial flush).
202        // Root span values will override these if a root is eventually encountered.
203        if sampling_priority.is_none() {
204            sampling_priority = span.metrics.get("_sampling_priority_v1").map(|v| *v as i32);
205        }
206        if origin.is_none() {
207            origin = span.meta.get("_dd.origin").map(|v| v.borrow());
208        }
209        if sampling_mechanism.is_none() {
210            sampling_mechanism = span
211                .meta
212                .get("_dd.p.dm")
213                .and_then(|v| v.borrow().parse::<i32>().ok())
214                .map(|dm| dm.unsigned_abs());
215        }
216    }
217
218    ChunkAttrs {
219        trace_id,
220        sampling_priority,
221        origin,
222        sampling_mechanism,
223    }
224}
225
226/// Encodes all traces as a V1 msgpack payload.
227///
228/// Top-level format:
229/// ```text
230/// Map {
231///   trace_key::ENV_REF      (7)  → str|uint       // optional, interned
232///   trace_key::HOSTNAME_REF (8)  → str|uint       // optional, interned
233///   trace_key::APP_VERSION  (9)  → str|uint       // optional, interned
234///   trace_key::ATTRIBUTES   (10) → Array[...]     // optional, flat triplets: key, type, value
235///   trace_key::CHUNKS       (11) → Array[Chunk, ...]
236/// }
237/// ```
238fn encode_payload<W: RmpWrite, T: TraceData, S: AsRef<[Span<T>]>>(
239    writer: &mut W,
240    traces: &[S],
241    metadata: &TracerMetadata,
242) -> Result<(), ValueWriteError<W::Error>> {
243    let mut table = StringTable::new();
244    let payload_attrs = extract_payload_attrs(traces, metadata);
245
246    let attr_count =
247        payload_attrs.apm_mode.is_some() as u32 + payload_attrs.git_commit_sha.is_some() as u32;
248    let has_attributes = attr_count > 0;
249
250    let map_len = 1u32 // chunks always present
251        + (!metadata.language.is_empty()) as u32
252        + (!metadata.language_version.is_empty()) as u32
253        + (!metadata.tracer_version.is_empty()) as u32
254        + (!metadata.runtime_id.is_empty()) as u32
255        + payload_attrs.env.is_some() as u32
256        + payload_attrs.hostname.is_some() as u32
257        + payload_attrs.app_version.is_some() as u32
258        + has_attributes as u32;
259
260    write_map_len(writer, map_len)?;
261
262    if !metadata.language.is_empty() {
263        write_uint8(writer, trace_key::LANGUAGE_NAME)?;
264        table.write_interned(writer, &metadata.language)?;
265    }
266
267    if !metadata.language_version.is_empty() {
268        write_uint8(writer, trace_key::LANGUAGE_VERSION)?;
269        table.write_interned(writer, &metadata.language_version)?;
270    }
271
272    if !metadata.tracer_version.is_empty() {
273        write_uint8(writer, trace_key::TRACER_VERSION)?;
274        table.write_interned(writer, &metadata.tracer_version)?;
275    }
276
277    if !metadata.runtime_id.is_empty() {
278        write_uint8(writer, trace_key::RUNTIME_ID)?;
279        table.write_interned(writer, &metadata.runtime_id)?;
280    }
281
282    if let Some(env) = payload_attrs.env {
283        write_uint8(writer, trace_key::ENV_REF)?;
284        table.write_interned(writer, env)?;
285    }
286
287    if let Some(hostname) = payload_attrs.hostname {
288        write_uint8(writer, trace_key::HOSTNAME_REF)?;
289        table.write_interned(writer, hostname)?;
290    }
291
292    if let Some(app_version) = payload_attrs.app_version {
293        write_uint8(writer, trace_key::APP_VERSION_REF)?;
294        table.write_interned(writer, app_version)?;
295    }
296
297    if has_attributes {
298        // Encoded as a flat array of triplets: [key, type_uint, value, ...]
299        // String values use type discriminant 1.
300        write_uint8(writer, trace_key::ATTRIBUTES)?;
301        write_array_len(writer, attr_count * 3)?;
302        if let Some(v) = payload_attrs.apm_mode {
303            table.write_interned(writer, "_dd.apm_mode")?;
304            write_uint8(writer, span_v04::AnyValueKey::String as u8)?;
305            table.write_interned(writer, v)?;
306        }
307        if let Some(v) = payload_attrs.git_commit_sha {
308            table.write_interned(writer, "_dd.git.commit.sha")?;
309            write_uint8(writer, span_v04::AnyValueKey::String as u8)?;
310            table.write_interned(writer, v)?;
311        }
312    }
313
314    write_uint8(writer, trace_key::CHUNKS)?;
315    write_array_len(writer, traces.len() as u32)?;
316    for trace in traces {
317        encode_chunk(writer, trace.as_ref(), &mut table)?;
318    }
319
320    Ok(())
321}
322
323/// Encodes one chunk (a group of spans sharing a trace ID).
324///
325/// ```text
326/// Map {
327///   chunk_key::TRACE_ID           (6) → bin[16]       // 128-bit big-endian
328///   chunk_key::ORIGIN             (2) → str|uint       // optional, interned
329///   chunk_key::PRIORITY           (1) → int            // optional
330///   chunk_key::SAMPLING_MECHANISM (7) → uint           // optional
331///   chunk_key::SPANS              (4) → Array[Span, ...]
332/// }
333/// ```
334fn encode_chunk<W: RmpWrite, T: TraceData>(
335    writer: &mut W,
336    spans: &[Span<T>],
337    table: &mut StringTable,
338) -> Result<(), ValueWriteError<W::Error>> {
339    let attrs = extract_chunk_attrs(spans);
340
341    let fields = 2u32 // trace_id + spans are always present
342        + attrs.origin.is_some() as u32
343        + attrs.sampling_priority.is_some() as u32
344        + attrs.sampling_mechanism.is_some() as u32;
345
346    write_map_len(writer, fields)?;
347
348    write_uint8(writer, chunk_key::TRACE_ID)?;
349    write_bin(writer, &attrs.trace_id.to_be_bytes())?;
350
351    if let Some(origin) = attrs.origin {
352        write_uint8(writer, chunk_key::ORIGIN)?;
353        table.write_interned(writer, origin)?;
354    }
355
356    if let Some(priority) = attrs.sampling_priority {
357        write_uint8(writer, chunk_key::PRIORITY)?;
358        write_sint(writer, priority as i64)?;
359    }
360
361    if let Some(mechanism) = attrs.sampling_mechanism {
362        write_uint8(writer, chunk_key::SAMPLING_MECHANISM)?;
363        write_uint(writer, mechanism as u64)?;
364    }
365
366    write_uint8(writer, chunk_key::SPANS)?;
367    write_array_len(writer, spans.len() as u32)?;
368    for span in spans {
369        span_v04::encode_span(writer, span, table)?;
370    }
371
372    Ok(())
373}
374
375/// Serializes traces into a slice using the V1 msgpack format.
376///
377/// # Errors
378/// Returns a `ValueWriteError` if the underlying writer fails.
379pub fn write_to_slice<T: TraceData, S: AsRef<[Span<T>]>>(
380    // &mut &mut [u8] lets the caller see the slice shrink as bytes are written.
381    slice: &mut &mut [u8],
382    traces: &[S],
383    metadata: &TracerMetadata,
384) -> Result<(), ValueWriteError> {
385    encode_payload(slice, traces, metadata)
386}
387
388/// Serializes traces into a `Vec<u8>` using the V1 msgpack format.
389pub fn to_vec<T: TraceData, S: AsRef<[Span<T>]>>(
390    traces: &[S],
391    metadata: &TracerMetadata,
392) -> Vec<u8> {
393    to_vec_with_capacity(traces, 0, metadata)
394}
395
396/// Serializes traces into a `Vec<u8>` with a pre-allocated capacity.
397pub fn to_vec_with_capacity<T: TraceData, S: AsRef<[Span<T>]>>(
398    traces: &[S],
399    capacity: u32,
400    metadata: &TracerMetadata,
401) -> Vec<u8> {
402    let mut buf = ByteBuf::with_capacity(capacity as usize);
403    encode_payload(&mut buf, traces, metadata)
404        .map_err(super::flatten_value_write_infallible)
405        .unwrap_infallible();
406    buf.into_vec()
407}
408
409/// Returns the number of bytes the V1 payload for `traces` would occupy.
410pub fn to_encoded_byte_len<T: TraceData, S: AsRef<[Span<T>]>>(
411    traces: &[S],
412    metadata: &TracerMetadata,
413) -> u32 {
414    let mut counter = super::CountLength(0);
415    // `CountLength` impls `std::io::Write` (whose error type is `std::io::Error`, not
416    // `Infallible`), so we can't statically prove infallibility via `unwrap_infallible`
417    // the way we do for `ByteBuf`. In practice `CountLength::write*` only ever return
418    // `Ok`, so the error path here is unreachable today; should `CountLength` ever grow
419    // a fallible code path, fuzz tests on the msgpack encoded length would catch it.
420    let _ = encode_payload(&mut counter, traces, metadata);
421    counter.0
422}
423
424#[cfg(test)]
425mod tests {
426    use super::*;
427    use crate::span::v04::SpanBytes;
428    use libdd_tinybytes::BytesString;
429    use std::collections::HashMap;
430
431    fn make_span(
432        service: &str,
433        name: &str,
434        trace_id: u128,
435        span_id: u64,
436        parent_id: u64,
437    ) -> SpanBytes {
438        SpanBytes {
439            service: BytesString::from_slice(service.as_bytes()).unwrap(),
440            name: BytesString::from_slice(name.as_bytes()).unwrap(),
441            resource: BytesString::from_slice(b"res").unwrap(),
442            trace_id,
443            span_id,
444            parent_id,
445            start: 1_000_000,
446            duration: 500,
447            ..Default::default()
448        }
449    }
450
451    #[test]
452    fn test_to_vec_non_empty() {
453        let spans = vec![make_span("svc", "op", 42, 1, 0)];
454        let traces = vec![spans];
455        let encoded = to_vec(&traces, &TracerMetadata::default());
456        assert!(!encoded.is_empty());
457    }
458
459    #[test]
460    fn test_to_vec_empty_traces() {
461        let traces: Vec<Vec<SpanBytes>> = vec![];
462        let encoded = to_vec(&traces, &TracerMetadata::default());
463        // Must still produce a valid msgpack map with an empty chunks array.
464        assert!(!encoded.is_empty());
465    }
466
467    #[test]
468    fn test_string_interning_reduces_size() {
469        // Two spans with the same service name — second occurrence should use the integer ID.
470        let s1 = make_span("my-service", "op1", 1, 1, 0);
471        let s2 = make_span("my-service", "op2", 2, 2, 0);
472        let traces_two = vec![vec![s1], vec![s2]];
473
474        // Single span for baseline.
475        let s_single = make_span("my-service", "op1", 1, 1, 0);
476        let traces_single = vec![vec![s_single]];
477
478        let encoded_two = to_vec(&traces_two, &TracerMetadata::default());
479        let encoded_single = to_vec(&traces_single, &TracerMetadata::default());
480
481        // The two-trace payload should be less than 2× the single-trace payload
482        // if interning is working (the second "my-service" is encoded as an integer).
483        assert!(
484            encoded_two.len() < 2 * encoded_single.len(),
485            "Interning should reduce size: two={} single={}",
486            encoded_two.len(),
487            encoded_single.len()
488        );
489    }
490
491    #[test]
492    fn test_chunk_level_attrs_origin_and_priority() {
493        let mut meta = HashMap::new();
494        meta.insert(
495            BytesString::from_static("_dd.origin"),
496            BytesString::from_static("lambda"),
497        );
498        let mut metrics = HashMap::new();
499        metrics.insert(BytesString::from_static("_sampling_priority_v1"), 1.0f64);
500
501        let root = SpanBytes {
502            service: BytesString::from_slice(b"svc").unwrap(),
503            name: BytesString::from_slice(b"op").unwrap(),
504            resource: BytesString::from_slice(b"res").unwrap(),
505            trace_id: 99,
506            span_id: 1,
507            parent_id: 0,
508            start: 1000,
509            duration: 100,
510            meta,
511            metrics,
512            ..Default::default()
513        };
514
515        let encoded = to_vec(&[vec![root]], &TracerMetadata::default());
516        assert!(!encoded.is_empty());
517        // The payload must contain "lambda" somewhere (the origin string).
518        let lambda_bytes = b"lambda";
519        assert!(
520            encoded
521                .windows(lambda_bytes.len())
522                .any(|w| w == lambda_bytes),
523            "origin 'lambda' should appear in payload"
524        );
525    }
526
527    #[test]
528    fn test_to_encoded_byte_len_matches_to_vec() {
529        let spans = vec![
530            make_span("svc", "op", 1, 1, 0),
531            make_span("svc", "child", 1, 2, 1),
532        ];
533        let traces = vec![spans];
534        let meta = TracerMetadata::default();
535        let encoded = to_vec(&traces, &meta);
536        let len = to_encoded_byte_len(&traces, &meta);
537        assert_eq!(encoded.len() as u32, len);
538    }
539
540    #[test]
541    fn test_remote_parent_root_span_top_level() {
542        // A span with a non-zero parent_id but _dd.top_level=1.0 is a root in its chunk.
543        let mut metrics = HashMap::new();
544        metrics.insert(BytesString::from_static("_dd.top_level"), 1.0f64);
545        metrics.insert(BytesString::from_static("_sampling_priority_v1"), 2.0f64);
546
547        let root = SpanBytes {
548            service: BytesString::from_slice(b"svc").unwrap(),
549            name: BytesString::from_slice(b"op").unwrap(),
550            resource: BytesString::from_slice(b"res").unwrap(),
551            trace_id: 123,
552            span_id: 42,
553            parent_id: 999, // remote parent — not in this chunk
554            start: 1000,
555            duration: 100,
556            metrics,
557            ..Default::default()
558        };
559
560        let encoded = to_vec(&[vec![root]], &TracerMetadata::default());
561        assert!(!encoded.is_empty());
562    }
563
564    #[test]
565    fn test_payload_promoted_fields() {
566        let mut meta = HashMap::new();
567        meta.insert(
568            BytesString::from_static("env"),
569            BytesString::from_static("prod"),
570        );
571        meta.insert(
572            BytesString::from_static("version"),
573            BytesString::from_static("1.2.3"),
574        );
575        meta.insert(
576            BytesString::from_static("_dd.hostname"),
577            BytesString::from_static("my-host"),
578        );
579
580        let span = SpanBytes {
581            service: BytesString::from_slice(b"svc").unwrap(),
582            name: BytesString::from_slice(b"op").unwrap(),
583            resource: BytesString::from_slice(b"res").unwrap(),
584            trace_id: 1,
585            span_id: 1,
586            parent_id: 0,
587            start: 1000,
588            duration: 100,
589            meta,
590            ..Default::default()
591        };
592
593        let encoded = to_vec(&[vec![span]], &TracerMetadata::default());
594        let prod_bytes = b"prod";
595        assert!(
596            encoded.windows(prod_bytes.len()).any(|w| w == prod_bytes),
597            "env 'prod' should appear in payload"
598        );
599        let host_bytes = b"my-host";
600        assert!(
601            encoded.windows(host_bytes.len()).any(|w| w == host_bytes),
602            "hostname 'my-host' should appear in payload"
603        );
604    }
605
606    #[test]
607    fn test_payload_attributes_apm_mode_and_git_commit_sha() {
608        let mut meta = HashMap::new();
609        meta.insert(
610            BytesString::from_static("_dd.apm_mode"),
611            BytesString::from_static("ssi"),
612        );
613        meta.insert(
614            BytesString::from_static("_dd.git.commit.sha"),
615            BytesString::from_static("abc123"),
616        );
617
618        let span = SpanBytes {
619            service: BytesString::from_slice(b"svc").unwrap(),
620            name: BytesString::from_slice(b"op").unwrap(),
621            resource: BytesString::from_slice(b"res").unwrap(),
622            trace_id: 1,
623            span_id: 1,
624            parent_id: 0,
625            start: 1000,
626            duration: 100,
627            meta,
628            ..Default::default()
629        };
630
631        let encoded = to_vec(&[vec![span]], &TracerMetadata::default());
632
633        // Both attribute strings must appear in the payload bytes.
634        let ssi_bytes = b"ssi";
635        assert!(
636            encoded.windows(ssi_bytes.len()).any(|w| w == ssi_bytes),
637            "apm_mode 'ssi' should appear in payload"
638        );
639        let sha_bytes = b"abc123";
640        assert!(
641            encoded.windows(sha_bytes.len()).any(|w| w == sha_bytes),
642            "git commit sha 'abc123' should appear in payload"
643        );
644        // The attribute key names must also be present (first occurrence is a raw str).
645        let apm_key = b"_dd.apm_mode";
646        assert!(
647            encoded.windows(apm_key.len()).any(|w| w == apm_key),
648            "_dd.apm_mode key should appear in payload"
649        );
650        let git_key = b"_dd.git.commit.sha";
651        assert!(
652            encoded.windows(git_key.len()).any(|w| w == git_key),
653            "_dd.git.commit.sha key should appear in payload"
654        );
655    }
656
657    #[test]
658    fn test_payload_attributes_absent_when_no_relevant_tags() {
659        // A span with no _dd.apm_mode or _dd.git.commit.sha must not produce key 10.
660        let span = make_span("svc", "op", 1, 1, 0);
661        let encoded = to_vec(&[vec![span]], &TracerMetadata::default());
662        let apm_key = b"_dd.apm_mode";
663        assert!(
664            !encoded.windows(apm_key.len()).any(|w| w == apm_key),
665            "key 10 should be absent when no relevant tags are set"
666        );
667    }
668
669    #[test]
670    fn test_payload_metadata_fields_present() {
671        let span = make_span("svc", "op", 1, 1, 0);
672        let metadata = TracerMetadata {
673            language: "python".to_string(),
674            language_version: "3.11".to_string(),
675            tracer_version: "2.0.0".to_string(),
676            runtime_id: "abc-123-uuid".to_string(),
677            ..Default::default()
678        };
679        let encoded = to_vec(&[vec![span]], &metadata);
680
681        for s in &[b"python" as &[u8], b"3.11", b"2.0.0", b"abc-123-uuid"] {
682            assert!(
683                encoded.windows(s.len()).any(|w| w == *s),
684                "{} should appear in payload",
685                std::str::from_utf8(s).unwrap()
686            );
687        }
688    }
689
690    #[test]
691    fn test_payload_metadata_absent_when_empty() {
692        let span = make_span("svc", "op", 1, 1, 0);
693        let encoded_with = to_vec(
694            &[vec![span.clone()]],
695            &TracerMetadata {
696                language: "go".to_string(),
697                ..Default::default()
698            },
699        );
700        let encoded_without = to_vec(&[vec![span]], &TracerMetadata::default());
701        // Payload with metadata must be larger (it carries extra fields).
702        assert!(encoded_with.len() > encoded_without.len());
703    }
704
705    #[test]
706    fn test_128bit_trace_id_from_dd_p_tid() {
707        let mut meta = HashMap::new();
708        meta.insert(
709            BytesString::from_static("_dd.p.tid"),
710            BytesString::from_static("640cfd5400000000"),
711        );
712        let span = SpanBytes {
713            service: BytesString::from_slice(b"svc").unwrap(),
714            name: BytesString::from_slice(b"op").unwrap(),
715            resource: BytesString::from_slice(b"res").unwrap(),
716            trace_id: 0x0123456789abcdef,
717            span_id: 1,
718            parent_id: 0,
719            start: 1000,
720            duration: 100,
721            meta,
722            ..Default::default()
723        };
724        let encoded = to_vec(&[vec![span]], &TracerMetadata::default());
725
726        // Expected 16-byte BE: high = 0x640cfd5400000000, low = 0x0123456789abcdef
727        let expected = [
728            0x64, 0x0c, 0xfd, 0x54, 0x00, 0x00, 0x00, 0x00, 0x01, 0x23, 0x45, 0x67, 0x89, 0xab,
729            0xcd, 0xef,
730        ];
731        assert!(
732            encoded.windows(16).any(|w| w == expected),
733            "128-bit trace_id big-endian bytes should appear in payload"
734        );
735        // _dd.p.tid must not also leak into span attributes.
736        let tid_key = b"_dd.p.tid";
737        assert!(
738            !encoded.windows(tid_key.len()).any(|w| w == tid_key),
739            "_dd.p.tid should be consumed, not encoded as a span attribute"
740        );
741    }
742
743    #[test]
744    fn test_128bit_trace_id_without_dd_p_tid() {
745        // Absent _dd.p.tid → high 64 bits zero.
746        let span = make_span("svc", "op", 0x0123456789abcdef, 1, 0);
747        let encoded = to_vec(&[vec![span]], &TracerMetadata::default());
748        let expected = [
749            0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x23, 0x45, 0x67, 0x89, 0xab,
750            0xcd, 0xef,
751        ];
752        assert!(
753            encoded.windows(16).any(|w| w == expected),
754            "absent _dd.p.tid should yield zero high 64 bits"
755        );
756    }
757
758    #[test]
759    fn test_sampling_mechanism_negative_value() {
760        // `_dd.p.dm` is a signed integer stored as a string (e.g. "-4" → manual rule).
761        // The encoder must parse it, take unsigned_abs, and emit it at chunk level.
762        let mut meta = HashMap::new();
763        meta.insert(
764            BytesString::from_static("_dd.p.dm"),
765            BytesString::from_static("-4"),
766        );
767        let root = SpanBytes {
768            service: BytesString::from_slice(b"svc").unwrap(),
769            name: BytesString::from_slice(b"op").unwrap(),
770            resource: BytesString::from_slice(b"res").unwrap(),
771            trace_id: 1,
772            span_id: 1,
773            parent_id: 0,
774            start: 1000,
775            duration: 100,
776            meta,
777            ..Default::default()
778        };
779        let encoded = to_vec(&[vec![root]], &TracerMetadata::default());
780
781        // The chunk-level sampling_mechanism (key 7) must be encoded as uint 4.
782        // The byte sequence is `chunk_key::SAMPLING_MECHANISM (0x07)` followed by the
783        // msgpack representation of 4 (positive fixint 0x04).
784        let expected = [chunk_key::SAMPLING_MECHANISM, 0x04];
785        assert!(
786            encoded.windows(2).any(|w| w == expected),
787            "sampling_mechanism should be encoded as unsigned_abs(\"-4\") = 4"
788        );
789    }
790
791    #[test]
792    fn test_chunk_attrs_fallback_no_root_span() {
793        // Partial flush: no root span (every span has a non-zero parent and no
794        // `_dd.top_level`). Values must be accumulated from non-root spans.
795        let mut meta1 = HashMap::new();
796        meta1.insert(
797            BytesString::from_static("_dd.origin"),
798            BytesString::from_static("lambda"),
799        );
800        let mut metrics2 = HashMap::new();
801        metrics2.insert(BytesString::from_static("_sampling_priority_v1"), 2.0f64);
802        let mut meta3 = HashMap::new();
803        meta3.insert(
804            BytesString::from_static("_dd.p.dm"),
805            BytesString::from_static("-3"),
806        );
807
808        let s1 = SpanBytes {
809            service: BytesString::from_slice(b"svc").unwrap(),
810            name: BytesString::from_slice(b"op1").unwrap(),
811            resource: BytesString::from_slice(b"res").unwrap(),
812            trace_id: 1,
813            span_id: 11,
814            parent_id: 10, // non-zero parent → not a root
815            start: 1000,
816            duration: 100,
817            meta: meta1,
818            ..Default::default()
819        };
820        let s2 = SpanBytes {
821            service: BytesString::from_slice(b"svc").unwrap(),
822            name: BytesString::from_slice(b"op2").unwrap(),
823            resource: BytesString::from_slice(b"res").unwrap(),
824            trace_id: 1,
825            span_id: 12,
826            parent_id: 11,
827            start: 1000,
828            duration: 100,
829            metrics: metrics2,
830            ..Default::default()
831        };
832        let s3 = SpanBytes {
833            service: BytesString::from_slice(b"svc").unwrap(),
834            name: BytesString::from_slice(b"op3").unwrap(),
835            resource: BytesString::from_slice(b"res").unwrap(),
836            trace_id: 1,
837            span_id: 13,
838            parent_id: 12,
839            start: 1000,
840            duration: 100,
841            meta: meta3,
842            ..Default::default()
843        };
844        let encoded = to_vec(&[vec![s1, s2, s3]], &TracerMetadata::default());
845
846        // Each attribute must be present at chunk level — collected from a different
847        // non-root span.
848        let lambda = b"lambda";
849        assert!(
850            encoded.windows(lambda.len()).any(|w| w == lambda),
851            "origin 'lambda' from span 1 should appear in payload"
852        );
853        // priority 2 → msgpack positive fixint 0x02 preceded by PRIORITY key
854        let prio = [chunk_key::PRIORITY, 0x02];
855        assert!(
856            encoded.windows(2).any(|w| w == prio),
857            "sampling_priority 2 from span 2 should appear"
858        );
859        // sampling_mechanism = unsigned_abs("-3") = 3 → 0x03 preceded by SAMPLING_MECHANISM key
860        let mech = [chunk_key::SAMPLING_MECHANISM, 0x03];
861        assert!(
862            encoded.windows(2).any(|w| w == mech),
863            "sampling_mechanism 3 from span 3 should appear"
864        );
865    }
866}