Skip to main content

libdd_trace_utils/msgpack_encoder/v1/
mod.rs

1// Copyright 2026-Present Datadog, Inc. https://www.datadoghq.com/
2// SPDX-License-Identifier: Apache-2.0
3
4mod span_v04;
5
6use crate::span::v04::Span;
7use crate::span::TraceData;
8use crate::tracer_metadata::TracerMetadata;
9use rmp::encode::{
10    write_array_len, write_bin, write_map_len, write_sint, write_str, write_uint, write_uint8,
11    ByteBuf, RmpWrite, ValueWriteError,
12};
13use std::borrow::Borrow;
14use std::collections::HashMap;
15
16/// Integer keys for the top-level V1 trace payload map.
17mod trace_key {
18    pub const LANGUAGE_NAME: u8 = 3;
19    pub const LANGUAGE_VERSION: u8 = 4;
20    pub const TRACER_VERSION: u8 = 5;
21    pub const RUNTIME_ID: u8 = 6;
22    pub const ENV_REF: u8 = 7;
23    pub const HOSTNAME_REF: u8 = 8;
24    pub const APP_VERSION_REF: u8 = 9;
25    /// Payload-level attributes map (e.g. `_dd.apm_mode`, `_dd.git.commit.sha`).
26    pub const ATTRIBUTES: u8 = 10;
27    pub const CHUNKS: u8 = 11;
28}
29
30/// Integer keys for V1 chunk-level fields.
31mod chunk_key {
32    pub const PRIORITY: u8 = 1;
33    pub const ORIGIN: u8 = 2;
34    pub const SPANS: u8 = 4;
35    pub const TRACE_ID: u8 = 6;
36    /// Sampling mechanism (previously the `_dd.p.dm` span tag).
37    pub const SAMPLING_MECHANISM: u8 = 7;
38}
39
40/// Streaming string intern table.
41///
42/// The first time a string is written, it is emitted as a msgpack `str` and assigned an
43/// incrementing integer ID. On subsequent occurrences only the ID is emitted as a msgpack `uint`.
44/// ID 0 is reserved for the empty string (pre-inserted in the constructor).
45///
46/// The string table is scoped per payload: each `to_vec` / `write_to_slice` call starts with a
47/// fresh table so deduplication is payload-local.
48pub(crate) struct StringTable {
49    seen: HashMap<String, u32>,
50}
51
52impl StringTable {
53    fn new() -> Self {
54        let mut seen = HashMap::new();
55        seen.insert(String::new(), 0);
56        Self { seen }
57    }
58
59    /// Writes `s` to `writer` using string interning.
60    ///
61    /// - First occurrence of `s` → msgpack `str`, ID recorded for future references
62    /// - Subsequent occurrence → msgpack `uint` carrying the previously assigned ID
63    pub(crate) fn write_interned<W: RmpWrite, S: AsRef<str>>(
64        &mut self,
65        writer: &mut W,
66        s: S,
67    ) -> Result<(), ValueWriteError<W::Error>> {
68        let s = s.as_ref();
69        if let Some(&id) = self.seen.get(s) {
70            write_uint(writer, id as u64)?;
71        } else {
72            let id = self.seen.len() as u32;
73            self.seen.insert(s.to_string(), id);
74            write_str(writer, s)?;
75        }
76        Ok(())
77    }
78}
79
80/// Promoted fields extracted from the payload's spans, written at the top-level map.
81struct PayloadAttrs<'a> {
82    env: Option<&'a str>,
83    hostname: Option<&'a str>,
84    app_version: Option<&'a str>,
85    /// `_dd.apm_mode` span tag, promoted to payload-level attributes.
86    apm_mode: Option<&'a str>,
87    /// `_dd.git.commit.sha` span tag, promoted to payload-level attributes.
88    git_commit_sha: Option<&'a str>,
89}
90
91fn extract_payload_attrs<'a, T: TraceData + 'a, S: AsRef<[Span<T>]>>(
92    traces: &'a [S],
93    metadata: &'a TracerMetadata,
94) -> PayloadAttrs<'a>
95where
96    T::Text: 'a,
97{
98    // Prefer TracerMetadata (set once on the builder) over span scanning. Fall back to
99    // span meta only when the builder-level value is missing — e.g. v04 payloads where
100    // the SDK propagated these as span tags.
101    let mut env = (!metadata.env.is_empty()).then_some(metadata.env.as_str());
102    let mut hostname = (!metadata.hostname.is_empty()).then_some(metadata.hostname.as_str());
103    let mut app_version =
104        (!metadata.app_version.is_empty()).then_some(metadata.app_version.as_str());
105    let mut git_commit_sha =
106        (!metadata.git_commit_sha.is_empty()).then_some(metadata.git_commit_sha.as_str());
107    let mut apm_mode = None;
108
109    'outer: for trace in traces {
110        for span in trace.as_ref() {
111            if env.is_none() {
112                env = span.meta.get("env").map(|v| v.borrow());
113            }
114            if hostname.is_none() {
115                hostname = span.meta.get("_dd.hostname").map(|v| v.borrow());
116            }
117            if app_version.is_none() {
118                app_version = span.meta.get("version").map(|v| v.borrow());
119            }
120            if apm_mode.is_none() {
121                apm_mode = span.meta.get("_dd.apm_mode").map(|v| v.borrow());
122            }
123            if git_commit_sha.is_none() {
124                git_commit_sha = span.meta.get("_dd.git.commit.sha").map(|v| v.borrow());
125            }
126            if env.is_some()
127                && hostname.is_some()
128                && app_version.is_some()
129                && apm_mode.is_some()
130                && git_commit_sha.is_some()
131            {
132                break 'outer;
133            }
134        }
135    }
136
137    PayloadAttrs {
138        env,
139        hostname,
140        app_version,
141        apm_mode,
142        git_commit_sha,
143    }
144}
145
146/// Promoted fields extracted from spans and written at the chunk level.
147struct ChunkAttrs<'a> {
148    /// Full 128-bit trace ID (encodes as 16-byte big-endian binary).
149    trace_id: u128,
150    /// Sampling priority from `_sampling_priority_v1` metric on the root span.
151    sampling_priority: Option<i32>,
152    /// Origin tag from `_dd.origin` meta on the root span.
153    origin: Option<&'a str>,
154    /// Sampling mechanism from `_dd.p.dm` meta on the root span.
155    sampling_mechanism: Option<u32>,
156}
157
158fn extract_chunk_attrs<'a, T: TraceData>(spans: &'a [Span<T>]) -> ChunkAttrs<'a>
159where
160    T::Text: 'a,
161{
162    // trace_id is invariant per chunk. The v04 wire format carries only the low 64 bits;
163    // the high 64 bits are propagated as the hex string meta tag "_dd.p.tid".
164    let trace_id = spans
165        .first()
166        .map(|s| {
167            let high = s
168                .meta
169                .get("_dd.p.tid")
170                .and_then(|v| u64::from_str_radix(v.borrow(), 16).ok())
171                .unwrap_or(0);
172            ((high as u128) << 64) | s.trace_id
173        })
174        .unwrap_or(0);
175
176    let mut sampling_priority = None;
177    let mut origin = None;
178    let mut sampling_mechanism = None;
179
180    for span in spans {
181        // Root span: either no parent in this chunk, or tagged _dd.top_level=1 (remote parent).
182        let is_root =
183            span.parent_id == 0 || span.metrics.get("_dd.top_level").copied().unwrap_or(0.0) == 1.0;
184
185        if is_root {
186            // Root span is authoritative: its values supersede any non-root fallback,
187            // including absence (a field missing on the root should not be filled from non-roots).
188            sampling_priority = span.metrics.get("_sampling_priority_v1").map(|v| *v as i32);
189            origin = span.meta.get("_dd.origin").map(|v| v.borrow());
190            // _dd.p.dm is a signed integer stored as a string; unsigned_abs preserves the
191            // magnitude.
192            sampling_mechanism = span
193                .meta
194                .get("_dd.p.dm")
195                .and_then(|v| v.borrow().parse::<i32>().ok())
196                .map(|dm| dm.unsigned_abs());
197            break;
198        }
199
200        // No root found yet — accumulate fallback values from non-root spans (partial flush).
201        // Root span values will override these if a root is eventually encountered.
202        if sampling_priority.is_none() {
203            sampling_priority = span.metrics.get("_sampling_priority_v1").map(|v| *v as i32);
204        }
205        if origin.is_none() {
206            origin = span.meta.get("_dd.origin").map(|v| v.borrow());
207        }
208        if sampling_mechanism.is_none() {
209            sampling_mechanism = span
210                .meta
211                .get("_dd.p.dm")
212                .and_then(|v| v.borrow().parse::<i32>().ok())
213                .map(|dm| dm.unsigned_abs());
214        }
215    }
216
217    ChunkAttrs {
218        trace_id,
219        sampling_priority,
220        origin,
221        sampling_mechanism,
222    }
223}
224
225/// Encodes all traces as a V1 msgpack payload.
226///
227/// Top-level format:
228/// ```text
229/// Map {
230///   trace_key::ENV_REF      (7)  → str|uint       // optional, interned
231///   trace_key::HOSTNAME_REF (8)  → str|uint       // optional, interned
232///   trace_key::APP_VERSION  (9)  → str|uint       // optional, interned
233///   trace_key::ATTRIBUTES   (10) → Array[...]     // optional, flat triplets: key, type, value
234///   trace_key::CHUNKS       (11) → Array[Chunk, ...]
235/// }
236/// ```
237fn encode_payload<W: RmpWrite, T: TraceData, S: AsRef<[Span<T>]>>(
238    writer: &mut W,
239    traces: &[S],
240    metadata: &TracerMetadata,
241) -> Result<(), ValueWriteError<W::Error>> {
242    let mut table = StringTable::new();
243    let payload_attrs = extract_payload_attrs(traces, metadata);
244
245    let attr_count =
246        payload_attrs.apm_mode.is_some() as u32 + payload_attrs.git_commit_sha.is_some() as u32;
247    let has_attributes = attr_count > 0;
248
249    let map_len = 1u32 // chunks always present
250        + (!metadata.language.is_empty()) as u32
251        + (!metadata.language_version.is_empty()) as u32
252        + (!metadata.tracer_version.is_empty()) as u32
253        + (!metadata.runtime_id.is_empty()) as u32
254        + payload_attrs.env.is_some() as u32
255        + payload_attrs.hostname.is_some() as u32
256        + payload_attrs.app_version.is_some() as u32
257        + has_attributes as u32;
258
259    write_map_len(writer, map_len)?;
260
261    if !metadata.language.is_empty() {
262        write_uint8(writer, trace_key::LANGUAGE_NAME)?;
263        table.write_interned(writer, &metadata.language)?;
264    }
265
266    if !metadata.language_version.is_empty() {
267        write_uint8(writer, trace_key::LANGUAGE_VERSION)?;
268        table.write_interned(writer, &metadata.language_version)?;
269    }
270
271    if !metadata.tracer_version.is_empty() {
272        write_uint8(writer, trace_key::TRACER_VERSION)?;
273        table.write_interned(writer, &metadata.tracer_version)?;
274    }
275
276    if !metadata.runtime_id.is_empty() {
277        write_uint8(writer, trace_key::RUNTIME_ID)?;
278        table.write_interned(writer, &metadata.runtime_id)?;
279    }
280
281    if let Some(env) = payload_attrs.env {
282        write_uint8(writer, trace_key::ENV_REF)?;
283        table.write_interned(writer, env)?;
284    }
285
286    if let Some(hostname) = payload_attrs.hostname {
287        write_uint8(writer, trace_key::HOSTNAME_REF)?;
288        table.write_interned(writer, hostname)?;
289    }
290
291    if let Some(app_version) = payload_attrs.app_version {
292        write_uint8(writer, trace_key::APP_VERSION_REF)?;
293        table.write_interned(writer, app_version)?;
294    }
295
296    if has_attributes {
297        // Encoded as a flat array of triplets: [key, type_uint, value, ...]
298        // String values use type discriminant 1.
299        write_uint8(writer, trace_key::ATTRIBUTES)?;
300        write_array_len(writer, attr_count * 3)?;
301        if let Some(v) = payload_attrs.apm_mode {
302            table.write_interned(writer, "_dd.apm_mode")?;
303            write_uint8(writer, span_v04::AnyValueKey::String as u8)?;
304            table.write_interned(writer, v)?;
305        }
306        if let Some(v) = payload_attrs.git_commit_sha {
307            table.write_interned(writer, "_dd.git.commit.sha")?;
308            write_uint8(writer, span_v04::AnyValueKey::String as u8)?;
309            table.write_interned(writer, v)?;
310        }
311    }
312
313    write_uint8(writer, trace_key::CHUNKS)?;
314    write_array_len(writer, traces.len() as u32)?;
315    for trace in traces {
316        encode_chunk(writer, trace.as_ref(), &mut table)?;
317    }
318
319    Ok(())
320}
321
322/// Encodes one chunk (a group of spans sharing a trace ID).
323///
324/// ```text
325/// Map {
326///   chunk_key::TRACE_ID           (6) → bin[16]       // 128-bit big-endian
327///   chunk_key::ORIGIN             (2) → str|uint       // optional, interned
328///   chunk_key::PRIORITY           (1) → int            // optional
329///   chunk_key::SAMPLING_MECHANISM (7) → uint           // optional
330///   chunk_key::SPANS              (4) → Array[Span, ...]
331/// }
332/// ```
333fn encode_chunk<W: RmpWrite, T: TraceData>(
334    writer: &mut W,
335    spans: &[Span<T>],
336    table: &mut StringTable,
337) -> Result<(), ValueWriteError<W::Error>> {
338    let attrs = extract_chunk_attrs(spans);
339
340    let fields = 2u32 // trace_id + spans are always present
341        + attrs.origin.is_some() as u32
342        + attrs.sampling_priority.is_some() as u32
343        + attrs.sampling_mechanism.is_some() as u32;
344
345    write_map_len(writer, fields)?;
346
347    write_uint8(writer, chunk_key::TRACE_ID)?;
348    write_bin(writer, &attrs.trace_id.to_be_bytes())?;
349
350    if let Some(origin) = attrs.origin {
351        write_uint8(writer, chunk_key::ORIGIN)?;
352        table.write_interned(writer, origin)?;
353    }
354
355    if let Some(priority) = attrs.sampling_priority {
356        write_uint8(writer, chunk_key::PRIORITY)?;
357        write_sint(writer, priority as i64)?;
358    }
359
360    if let Some(mechanism) = attrs.sampling_mechanism {
361        write_uint8(writer, chunk_key::SAMPLING_MECHANISM)?;
362        write_uint(writer, mechanism as u64)?;
363    }
364
365    write_uint8(writer, chunk_key::SPANS)?;
366    write_array_len(writer, spans.len() as u32)?;
367    for span in spans {
368        span_v04::encode_span(writer, span, table)?;
369    }
370
371    Ok(())
372}
373
374/// Serializes traces into a slice using the V1 msgpack format.
375///
376/// # Errors
377/// Returns a `ValueWriteError` if the underlying writer fails.
378pub fn write_to_slice<T: TraceData, S: AsRef<[Span<T>]>>(
379    // &mut &mut [u8] lets the caller see the slice shrink as bytes are written.
380    slice: &mut &mut [u8],
381    traces: &[S],
382    metadata: &TracerMetadata,
383) -> Result<(), ValueWriteError> {
384    encode_payload(slice, traces, metadata)
385}
386
387/// Serializes traces into a `Vec<u8>` using the V1 msgpack format.
388pub fn to_vec<T: TraceData, S: AsRef<[Span<T>]>>(
389    traces: &[S],
390    metadata: &TracerMetadata,
391) -> Vec<u8> {
392    to_vec_with_capacity(traces, 0, metadata)
393}
394
395/// Serializes traces into a `Vec<u8>` with a pre-allocated capacity.
396pub fn to_vec_with_capacity<T: TraceData, S: AsRef<[Span<T>]>>(
397    traces: &[S],
398    capacity: u32,
399    metadata: &TracerMetadata,
400) -> Vec<u8> {
401    let mut buf = ByteBuf::with_capacity(capacity as usize);
402    let _ = encode_payload(&mut buf, traces, metadata); // infallible: ByteBuf write never fails
403    buf.into_vec()
404}
405
406/// Returns the number of bytes the V1 payload for `traces` would occupy.
407pub fn to_encoded_byte_len<T: TraceData, S: AsRef<[Span<T>]>>(
408    traces: &[S],
409    metadata: &TracerMetadata,
410) -> u32 {
411    let mut counter = super::CountLength(0);
412    let _ = encode_payload(&mut counter, traces, metadata); // infallible: CountLength write never fails
413    counter.0
414}
415
416#[cfg(test)]
417mod tests {
418    use super::*;
419    use crate::span::v04::SpanBytes;
420    use libdd_tinybytes::BytesString;
421    use std::collections::HashMap;
422
423    fn make_span(
424        service: &str,
425        name: &str,
426        trace_id: u128,
427        span_id: u64,
428        parent_id: u64,
429    ) -> SpanBytes {
430        SpanBytes {
431            service: BytesString::from_slice(service.as_bytes()).unwrap(),
432            name: BytesString::from_slice(name.as_bytes()).unwrap(),
433            resource: BytesString::from_slice(b"res").unwrap(),
434            trace_id,
435            span_id,
436            parent_id,
437            start: 1_000_000,
438            duration: 500,
439            ..Default::default()
440        }
441    }
442
443    #[test]
444    fn test_to_vec_non_empty() {
445        let spans = vec![make_span("svc", "op", 42, 1, 0)];
446        let traces = vec![spans];
447        let encoded = to_vec(&traces, &TracerMetadata::default());
448        assert!(!encoded.is_empty());
449    }
450
451    #[test]
452    fn test_to_vec_empty_traces() {
453        let traces: Vec<Vec<SpanBytes>> = vec![];
454        let encoded = to_vec(&traces, &TracerMetadata::default());
455        // Must still produce a valid msgpack map with an empty chunks array.
456        assert!(!encoded.is_empty());
457    }
458
459    #[test]
460    fn test_string_interning_reduces_size() {
461        // Two spans with the same service name — second occurrence should use the integer ID.
462        let s1 = make_span("my-service", "op1", 1, 1, 0);
463        let s2 = make_span("my-service", "op2", 2, 2, 0);
464        let traces_two = vec![vec![s1], vec![s2]];
465
466        // Single span for baseline.
467        let s_single = make_span("my-service", "op1", 1, 1, 0);
468        let traces_single = vec![vec![s_single]];
469
470        let encoded_two = to_vec(&traces_two, &TracerMetadata::default());
471        let encoded_single = to_vec(&traces_single, &TracerMetadata::default());
472
473        // The two-trace payload should be less than 2× the single-trace payload
474        // if interning is working (the second "my-service" is encoded as an integer).
475        assert!(
476            encoded_two.len() < 2 * encoded_single.len(),
477            "Interning should reduce size: two={} single={}",
478            encoded_two.len(),
479            encoded_single.len()
480        );
481    }
482
483    #[test]
484    fn test_chunk_level_attrs_origin_and_priority() {
485        let mut meta = HashMap::new();
486        meta.insert(
487            BytesString::from_static("_dd.origin"),
488            BytesString::from_static("lambda"),
489        );
490        let mut metrics = HashMap::new();
491        metrics.insert(BytesString::from_static("_sampling_priority_v1"), 1.0f64);
492
493        let root = SpanBytes {
494            service: BytesString::from_slice(b"svc").unwrap(),
495            name: BytesString::from_slice(b"op").unwrap(),
496            resource: BytesString::from_slice(b"res").unwrap(),
497            trace_id: 99,
498            span_id: 1,
499            parent_id: 0,
500            start: 1000,
501            duration: 100,
502            meta,
503            metrics,
504            ..Default::default()
505        };
506
507        let encoded = to_vec(&[vec![root]], &TracerMetadata::default());
508        assert!(!encoded.is_empty());
509        // The payload must contain "lambda" somewhere (the origin string).
510        let lambda_bytes = b"lambda";
511        assert!(
512            encoded
513                .windows(lambda_bytes.len())
514                .any(|w| w == lambda_bytes),
515            "origin 'lambda' should appear in payload"
516        );
517    }
518
519    #[test]
520    fn test_to_encoded_byte_len_matches_to_vec() {
521        let spans = vec![
522            make_span("svc", "op", 1, 1, 0),
523            make_span("svc", "child", 1, 2, 1),
524        ];
525        let traces = vec![spans];
526        let meta = TracerMetadata::default();
527        let encoded = to_vec(&traces, &meta);
528        let len = to_encoded_byte_len(&traces, &meta);
529        assert_eq!(encoded.len() as u32, len);
530    }
531
532    #[test]
533    fn test_remote_parent_root_span_top_level() {
534        // A span with a non-zero parent_id but _dd.top_level=1.0 is a root in its chunk.
535        let mut metrics = HashMap::new();
536        metrics.insert(BytesString::from_static("_dd.top_level"), 1.0f64);
537        metrics.insert(BytesString::from_static("_sampling_priority_v1"), 2.0f64);
538
539        let root = SpanBytes {
540            service: BytesString::from_slice(b"svc").unwrap(),
541            name: BytesString::from_slice(b"op").unwrap(),
542            resource: BytesString::from_slice(b"res").unwrap(),
543            trace_id: 123,
544            span_id: 42,
545            parent_id: 999, // remote parent — not in this chunk
546            start: 1000,
547            duration: 100,
548            metrics,
549            ..Default::default()
550        };
551
552        let encoded = to_vec(&[vec![root]], &TracerMetadata::default());
553        assert!(!encoded.is_empty());
554    }
555
556    #[test]
557    fn test_payload_promoted_fields() {
558        let mut meta = HashMap::new();
559        meta.insert(
560            BytesString::from_static("env"),
561            BytesString::from_static("prod"),
562        );
563        meta.insert(
564            BytesString::from_static("version"),
565            BytesString::from_static("1.2.3"),
566        );
567        meta.insert(
568            BytesString::from_static("_dd.hostname"),
569            BytesString::from_static("my-host"),
570        );
571
572        let span = SpanBytes {
573            service: BytesString::from_slice(b"svc").unwrap(),
574            name: BytesString::from_slice(b"op").unwrap(),
575            resource: BytesString::from_slice(b"res").unwrap(),
576            trace_id: 1,
577            span_id: 1,
578            parent_id: 0,
579            start: 1000,
580            duration: 100,
581            meta,
582            ..Default::default()
583        };
584
585        let encoded = to_vec(&[vec![span]], &TracerMetadata::default());
586        let prod_bytes = b"prod";
587        assert!(
588            encoded.windows(prod_bytes.len()).any(|w| w == prod_bytes),
589            "env 'prod' should appear in payload"
590        );
591        let host_bytes = b"my-host";
592        assert!(
593            encoded.windows(host_bytes.len()).any(|w| w == host_bytes),
594            "hostname 'my-host' should appear in payload"
595        );
596    }
597
598    #[test]
599    fn test_payload_attributes_apm_mode_and_git_commit_sha() {
600        let mut meta = HashMap::new();
601        meta.insert(
602            BytesString::from_static("_dd.apm_mode"),
603            BytesString::from_static("ssi"),
604        );
605        meta.insert(
606            BytesString::from_static("_dd.git.commit.sha"),
607            BytesString::from_static("abc123"),
608        );
609
610        let span = SpanBytes {
611            service: BytesString::from_slice(b"svc").unwrap(),
612            name: BytesString::from_slice(b"op").unwrap(),
613            resource: BytesString::from_slice(b"res").unwrap(),
614            trace_id: 1,
615            span_id: 1,
616            parent_id: 0,
617            start: 1000,
618            duration: 100,
619            meta,
620            ..Default::default()
621        };
622
623        let encoded = to_vec(&[vec![span]], &TracerMetadata::default());
624
625        // Both attribute strings must appear in the payload bytes.
626        let ssi_bytes = b"ssi";
627        assert!(
628            encoded.windows(ssi_bytes.len()).any(|w| w == ssi_bytes),
629            "apm_mode 'ssi' should appear in payload"
630        );
631        let sha_bytes = b"abc123";
632        assert!(
633            encoded.windows(sha_bytes.len()).any(|w| w == sha_bytes),
634            "git commit sha 'abc123' should appear in payload"
635        );
636        // The attribute key names must also be present (first occurrence is a raw str).
637        let apm_key = b"_dd.apm_mode";
638        assert!(
639            encoded.windows(apm_key.len()).any(|w| w == apm_key),
640            "_dd.apm_mode key should appear in payload"
641        );
642        let git_key = b"_dd.git.commit.sha";
643        assert!(
644            encoded.windows(git_key.len()).any(|w| w == git_key),
645            "_dd.git.commit.sha key should appear in payload"
646        );
647    }
648
649    #[test]
650    fn test_payload_attributes_absent_when_no_relevant_tags() {
651        // A span with no _dd.apm_mode or _dd.git.commit.sha must not produce key 10.
652        let span = make_span("svc", "op", 1, 1, 0);
653        let encoded = to_vec(&[vec![span]], &TracerMetadata::default());
654        let apm_key = b"_dd.apm_mode";
655        assert!(
656            !encoded.windows(apm_key.len()).any(|w| w == apm_key),
657            "key 10 should be absent when no relevant tags are set"
658        );
659    }
660
661    #[test]
662    fn test_payload_metadata_fields_present() {
663        let span = make_span("svc", "op", 1, 1, 0);
664        let metadata = TracerMetadata {
665            language: "python".to_string(),
666            language_version: "3.11".to_string(),
667            tracer_version: "2.0.0".to_string(),
668            runtime_id: "abc-123-uuid".to_string(),
669            ..Default::default()
670        };
671        let encoded = to_vec(&[vec![span]], &metadata);
672
673        for s in &[b"python" as &[u8], b"3.11", b"2.0.0", b"abc-123-uuid"] {
674            assert!(
675                encoded.windows(s.len()).any(|w| w == *s),
676                "{} should appear in payload",
677                std::str::from_utf8(s).unwrap()
678            );
679        }
680    }
681
682    #[test]
683    fn test_payload_metadata_absent_when_empty() {
684        let span = make_span("svc", "op", 1, 1, 0);
685        let encoded_with = to_vec(
686            &[vec![span.clone()]],
687            &TracerMetadata {
688                language: "go".to_string(),
689                ..Default::default()
690            },
691        );
692        let encoded_without = to_vec(&[vec![span]], &TracerMetadata::default());
693        // Payload with metadata must be larger (it carries extra fields).
694        assert!(encoded_with.len() > encoded_without.len());
695    }
696
697    #[test]
698    fn test_128bit_trace_id_from_dd_p_tid() {
699        let mut meta = HashMap::new();
700        meta.insert(
701            BytesString::from_static("_dd.p.tid"),
702            BytesString::from_static("640cfd5400000000"),
703        );
704        let span = SpanBytes {
705            service: BytesString::from_slice(b"svc").unwrap(),
706            name: BytesString::from_slice(b"op").unwrap(),
707            resource: BytesString::from_slice(b"res").unwrap(),
708            trace_id: 0x0123456789abcdef,
709            span_id: 1,
710            parent_id: 0,
711            start: 1000,
712            duration: 100,
713            meta,
714            ..Default::default()
715        };
716        let encoded = to_vec(&[vec![span]], &TracerMetadata::default());
717
718        // Expected 16-byte BE: high = 0x640cfd5400000000, low = 0x0123456789abcdef
719        let expected = [
720            0x64, 0x0c, 0xfd, 0x54, 0x00, 0x00, 0x00, 0x00, 0x01, 0x23, 0x45, 0x67, 0x89, 0xab,
721            0xcd, 0xef,
722        ];
723        assert!(
724            encoded.windows(16).any(|w| w == expected),
725            "128-bit trace_id big-endian bytes should appear in payload"
726        );
727        // _dd.p.tid must not also leak into span attributes.
728        let tid_key = b"_dd.p.tid";
729        assert!(
730            !encoded.windows(tid_key.len()).any(|w| w == tid_key),
731            "_dd.p.tid should be consumed, not encoded as a span attribute"
732        );
733    }
734
735    #[test]
736    fn test_128bit_trace_id_without_dd_p_tid() {
737        // Absent _dd.p.tid → high 64 bits zero.
738        let span = make_span("svc", "op", 0x0123456789abcdef, 1, 0);
739        let encoded = to_vec(&[vec![span]], &TracerMetadata::default());
740        let expected = [
741            0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x23, 0x45, 0x67, 0x89, 0xab,
742            0xcd, 0xef,
743        ];
744        assert!(
745            encoded.windows(16).any(|w| w == expected),
746            "absent _dd.p.tid should yield zero high 64 bits"
747        );
748    }
749
750    #[test]
751    fn test_sampling_mechanism_negative_value() {
752        // `_dd.p.dm` is a signed integer stored as a string (e.g. "-4" → manual rule).
753        // The encoder must parse it, take unsigned_abs, and emit it at chunk level.
754        let mut meta = HashMap::new();
755        meta.insert(
756            BytesString::from_static("_dd.p.dm"),
757            BytesString::from_static("-4"),
758        );
759        let root = SpanBytes {
760            service: BytesString::from_slice(b"svc").unwrap(),
761            name: BytesString::from_slice(b"op").unwrap(),
762            resource: BytesString::from_slice(b"res").unwrap(),
763            trace_id: 1,
764            span_id: 1,
765            parent_id: 0,
766            start: 1000,
767            duration: 100,
768            meta,
769            ..Default::default()
770        };
771        let encoded = to_vec(&[vec![root]], &TracerMetadata::default());
772
773        // The chunk-level sampling_mechanism (key 7) must be encoded as uint 4.
774        // The byte sequence is `chunk_key::SAMPLING_MECHANISM (0x07)` followed by the
775        // msgpack representation of 4 (positive fixint 0x04).
776        let expected = [chunk_key::SAMPLING_MECHANISM, 0x04];
777        assert!(
778            encoded.windows(2).any(|w| w == expected),
779            "sampling_mechanism should be encoded as unsigned_abs(\"-4\") = 4"
780        );
781    }
782
783    #[test]
784    fn test_chunk_attrs_fallback_no_root_span() {
785        // Partial flush: no root span (every span has a non-zero parent and no
786        // `_dd.top_level`). Values must be accumulated from non-root spans.
787        let mut meta1 = HashMap::new();
788        meta1.insert(
789            BytesString::from_static("_dd.origin"),
790            BytesString::from_static("lambda"),
791        );
792        let mut metrics2 = HashMap::new();
793        metrics2.insert(BytesString::from_static("_sampling_priority_v1"), 2.0f64);
794        let mut meta3 = HashMap::new();
795        meta3.insert(
796            BytesString::from_static("_dd.p.dm"),
797            BytesString::from_static("-3"),
798        );
799
800        let s1 = SpanBytes {
801            service: BytesString::from_slice(b"svc").unwrap(),
802            name: BytesString::from_slice(b"op1").unwrap(),
803            resource: BytesString::from_slice(b"res").unwrap(),
804            trace_id: 1,
805            span_id: 11,
806            parent_id: 10, // non-zero parent → not a root
807            start: 1000,
808            duration: 100,
809            meta: meta1,
810            ..Default::default()
811        };
812        let s2 = SpanBytes {
813            service: BytesString::from_slice(b"svc").unwrap(),
814            name: BytesString::from_slice(b"op2").unwrap(),
815            resource: BytesString::from_slice(b"res").unwrap(),
816            trace_id: 1,
817            span_id: 12,
818            parent_id: 11,
819            start: 1000,
820            duration: 100,
821            metrics: metrics2,
822            ..Default::default()
823        };
824        let s3 = SpanBytes {
825            service: BytesString::from_slice(b"svc").unwrap(),
826            name: BytesString::from_slice(b"op3").unwrap(),
827            resource: BytesString::from_slice(b"res").unwrap(),
828            trace_id: 1,
829            span_id: 13,
830            parent_id: 12,
831            start: 1000,
832            duration: 100,
833            meta: meta3,
834            ..Default::default()
835        };
836        let encoded = to_vec(&[vec![s1, s2, s3]], &TracerMetadata::default());
837
838        // Each attribute must be present at chunk level — collected from a different
839        // non-root span.
840        let lambda = b"lambda";
841        assert!(
842            encoded.windows(lambda.len()).any(|w| w == lambda),
843            "origin 'lambda' from span 1 should appear in payload"
844        );
845        // priority 2 → msgpack positive fixint 0x02 preceded by PRIORITY key
846        let prio = [chunk_key::PRIORITY, 0x02];
847        assert!(
848            encoded.windows(2).any(|w| w == prio),
849            "sampling_priority 2 from span 2 should appear"
850        );
851        // sampling_mechanism = unsigned_abs("-3") = 3 → 0x03 preceded by SAMPLING_MECHANISM key
852        let mech = [chunk_key::SAMPLING_MECHANISM, 0x03];
853        assert!(
854            encoded.windows(2).any(|w| w == mech),
855            "sampling_mechanism 3 from span 3 should appear"
856        );
857    }
858}