Skip to main content

dial9_trace_format/
codec.rs

1//! Wire-format encoding and decoding for trace frames.
2//!
3//! This module contains the low-level frame codec. Most users should use
4//! [`Encoder`](crate::encoder::Encoder) and [`Decoder`](crate::decoder::Decoder)
5//! instead. The types [`WireTypeId`], [`PoolEntry`], and [`PoolEntryRef`] are
6//! re-exported here because they appear in the decoder's public API.
7
8use crate::schema::{FieldAnnotation, FieldDef, SchemaEntry};
9use crate::types::{FieldType, FieldValue, FieldValueRef};
10use std::io::{self, Write};
11
12/// Type ID as it appears on the wire (u16 in schema/event frame headers).
13/// Assigned sequentially by the encoder; the decoder reads them from the stream.
14///
15/// ## Note
16/// The wire type id is only stable within a single file. It is not static.
17#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
18pub struct WireTypeId(pub u16);
19
20pub(crate) const MAGIC: [u8; 4] = [0x54, 0x52, 0x43, 0x00]; // TRC\0
21pub(crate) const VERSION: u8 = 1;
22pub(crate) const HEADER_SIZE: usize = 5;
23
24pub(crate) const TAG_SCHEMA: u8 = 0x01;
25pub(crate) const TAG_EVENT: u8 = 0x02;
26pub(crate) const TAG_STRING_POOL: u8 = 0x03;
27pub(crate) const TAG_STACK_POOL: u8 = 0x04;
28pub(crate) const TAG_TIMESTAMP_RESET: u8 = 0x05;
29pub(crate) const TAG_SCHEMA_ANNOTATIONS: u8 = 0x06;
30
31/// Maximum nanosecond delta that fits in a u24 (3 bytes).
32pub(crate) const MAX_TIMESTAMP_DELTA_NS: u64 = 0xFF_FFFF; // 16,777,215
33
34/// Encode a u32 value as 3-byte little-endian (u24). Caller must ensure `value <= 0xFF_FFFF`.
35#[inline]
36pub(crate) fn encode_u24_le(value: u32, w: &mut impl Write) -> io::Result<()> {
37    debug_assert!(value <= MAX_TIMESTAMP_DELTA_NS as u32);
38    w.write_all(&[value as u8, (value >> 8) as u8, (value >> 16) as u8])
39}
40
41/// Decode a 3-byte little-endian u24 from `data`. Returns `None` if fewer than 3 bytes.
42#[inline]
43pub(crate) fn decode_u24_le(data: &[u8]) -> Option<u32> {
44    let b = data.get(..3)?;
45    Some(b[0] as u32 | (b[1] as u32) << 8 | (b[2] as u32) << 16)
46}
47
48/// An owned string pool entry.
49#[derive(Debug, Clone, PartialEq)]
50pub struct PoolEntry {
51    /// Pool ID assigned by the encoder.
52    pub pool_id: u32,
53    /// Raw string data.
54    pub data: Vec<u8>,
55}
56
57/// An owned stack-frame pool entry. Frames are leaf-first u64 addresses.
58#[derive(Debug, Clone, PartialEq)]
59pub struct StackPoolEntry {
60    /// Pool ID assigned by the encoder.
61    pub pool_id: u32,
62    /// Stack frame addresses (leaf-first).
63    pub frames: Vec<u64>,
64}
65
66#[derive(Debug, Clone, PartialEq)]
67pub(crate) enum Frame {
68    Schema {
69        type_id: WireTypeId,
70        entry: SchemaEntry,
71    },
72    Event {
73        type_id: WireTypeId,
74        /// Absolute timestamp in nanoseconds, if the schema has `has_timestamp`.
75        timestamp_ns: Option<u64>,
76        values: Vec<FieldValue>,
77    },
78    StringPool(Vec<PoolEntry>),
79    StackPool(Vec<StackPoolEntry>),
80    TimestampReset(u64),
81    SchemaAnnotations {
82        type_id: WireTypeId,
83        annotations: Vec<FieldAnnotation>,
84    },
85}
86
87/// Zero-copy pool entry borrowing from the input buffer.
88#[non_exhaustive]
89#[derive(Debug, Clone, PartialEq)]
90pub struct PoolEntryRef<'a> {
91    /// Pool ID assigned by the encoder.
92    pub pool_id: u32,
93    /// Raw string data borrowed from the decode buffer.
94    pub data: &'a [u8],
95}
96
97/// Zero-copy stack-frame pool entry borrowing from the input buffer.
98#[non_exhaustive]
99#[derive(Debug, Clone, PartialEq)]
100pub struct StackPoolEntryRef<'a> {
101    /// Pool ID assigned by the encoder.
102    pub pool_id: u32,
103    /// Raw u64-LE bytes borrowed from the decode buffer.
104    pub frames_le: &'a [u8],
105}
106
107impl<'a> StackPoolEntryRef<'a> {
108    /// Number of frames in the entry.
109    pub fn frame_count(&self) -> u32 {
110        (self.frames_le.len() / 8) as u32
111    }
112
113    /// Iterate the stack frame addresses.
114    pub fn iter(&self) -> crate::types::StackFrameIter<'a> {
115        crate::types::StackFrameIter::new(self.frames_le, self.frame_count())
116    }
117
118    /// Collect into an owned [`StackFrames`](crate::types::StackFrames).
119    pub fn to_stack_frames(&self) -> crate::types::StackFrames {
120        self.iter().collect()
121    }
122}
123
124/// Zero-copy frame that borrows from the input buffer.
125#[derive(Debug, Clone, PartialEq)]
126pub(crate) enum FrameRef<'a> {
127    Schema {
128        type_id: WireTypeId,
129        entry: SchemaEntry,
130    },
131    Event {
132        type_id: WireTypeId,
133        timestamp_ns: Option<u64>,
134        values: Vec<FieldValueRef<'a>>,
135    },
136    StringPool(Vec<PoolEntryRef<'a>>),
137    StackPool(Vec<StackPoolEntryRef<'a>>),
138    TimestampReset(u64),
139    SchemaAnnotations {
140        type_id: WireTypeId,
141        annotations: Vec<FieldAnnotation>,
142    },
143}
144
145/// Schema info needed by the decoder: raw field type tags + has_timestamp flag.
146/// Raw tags preserve the optional bit (0x80) for correct decode handling.
147pub(crate) struct SchemaInfo<'a> {
148    pub field_tags: &'a [u8],
149    pub has_timestamp: bool,
150}
151
152// --- Encoding ---
153
154pub(crate) fn encode_header(w: &mut impl Write) -> io::Result<()> {
155    w.write_all(&MAGIC)?;
156    w.write_all(&[VERSION])
157}
158
159pub(crate) fn encode_schema(
160    type_id: WireTypeId,
161    entry: &SchemaEntry,
162    w: &mut impl Write,
163) -> io::Result<()> {
164    w.write_all(&[TAG_SCHEMA])?;
165    w.write_all(&type_id.0.to_le_bytes())?;
166    let name_bytes = entry.name.as_bytes();
167    w.write_all(&(name_bytes.len() as u16).to_le_bytes())?;
168    w.write_all(name_bytes)?;
169    w.write_all(&[if entry.has_timestamp { 1 } else { 0 }])?;
170    w.write_all(&(entry.fields.len() as u16).to_le_bytes())?;
171    for f in &entry.fields {
172        let fname = f.name.as_bytes();
173        w.write_all(&(fname.len() as u16).to_le_bytes())?;
174        w.write_all(fname)?;
175        w.write_all(&[f.field_type as u8])?;
176    }
177    Ok(())
178}
179
180/// Encode an event frame. If `timestamp_delta_ns` is Some, writes a u24 LE delta
181/// after the type_id (for schemas with `has_timestamp = true`).
182#[cfg(test)]
183pub(crate) fn encode_event(
184    type_id: WireTypeId,
185    timestamp_delta_ns: Option<u32>,
186    values: &[FieldValue],
187    w: &mut impl Write,
188) -> io::Result<()> {
189    w.write_all(&[TAG_EVENT])?;
190    w.write_all(&type_id.0.to_le_bytes())?;
191    if let Some(delta) = timestamp_delta_ns {
192        encode_u24_le(delta, w)?;
193    }
194    for v in values {
195        v.encode(w)?;
196    }
197    Ok(())
198}
199
200pub(crate) fn encode_string_pool(entries: &[PoolEntry], w: &mut impl Write) -> io::Result<()> {
201    w.write_all(&[TAG_STRING_POOL])?;
202    w.write_all(&(entries.len() as u32).to_le_bytes())?;
203    for e in entries {
204        w.write_all(&e.pool_id.to_le_bytes())?;
205        w.write_all(&(e.data.len() as u32).to_le_bytes())?;
206        w.write_all(&e.data)?;
207    }
208    Ok(())
209}
210
211pub(crate) fn encode_stack_pool(entries: &[StackPoolEntry], w: &mut impl Write) -> io::Result<()> {
212    w.write_all(&[TAG_STACK_POOL])?;
213    w.write_all(&(entries.len() as u32).to_le_bytes())?;
214    for e in entries {
215        w.write_all(&e.pool_id.to_le_bytes())?;
216        w.write_all(&(e.frames.len() as u32).to_le_bytes())?;
217        for &addr in &e.frames {
218            w.write_all(&addr.to_le_bytes())?;
219        }
220    }
221    Ok(())
222}
223
224pub(crate) fn encode_schema_annotations(
225    type_id: WireTypeId,
226    annotations: &[FieldAnnotation],
227    w: &mut impl Write,
228) -> io::Result<()> {
229    let count: u16 = annotations.len().try_into().map_err(|_| {
230        io::Error::new(
231            io::ErrorKind::InvalidInput,
232            "annotation count exceeds u16::MAX",
233        )
234    })?;
235    w.write_all(&[TAG_SCHEMA_ANNOTATIONS])?;
236    crate::leb128::encode_unsigned(type_id.0 as u64, w)?;
237    w.write_all(&count.to_le_bytes())?;
238    for a in annotations {
239        w.write_all(&a.field_index().to_le_bytes())?;
240        let key_bytes = a.key().as_bytes();
241        w.write_all(&(key_bytes.len() as u16).to_le_bytes())?;
242        w.write_all(key_bytes)?;
243        let value_bytes = a.value().as_bytes();
244        w.write_all(&(value_bytes.len() as u32).to_le_bytes())?;
245        w.write_all(value_bytes)?;
246    }
247    Ok(())
248}
249
250// --- Decoding ---
251
252pub(crate) fn decode_header(data: &[u8]) -> Option<u8> {
253    if data.get(..4)? != MAGIC {
254        return None;
255    }
256    let version = *data.get(4)?;
257    Some(version)
258}
259
260/// Decode a single frame starting at `data`. Returns (Frame, bytes_consumed).
261pub(crate) fn decode_frame<'s>(
262    data: &[u8],
263    schema_lookup: impl Fn(WireTypeId) -> Option<SchemaInfo<'s>>,
264    timestamp_base_ns: u64,
265) -> Option<(Frame, usize)> {
266    let tag = *data.first()?;
267    match tag {
268        TAG_SCHEMA => decode_schema_frame(data),
269        TAG_EVENT => decode_event_frame(data, schema_lookup, timestamp_base_ns),
270        TAG_STRING_POOL => decode_string_pool_frame(data),
271        TAG_STACK_POOL => decode_stack_pool_frame(data),
272        TAG_TIMESTAMP_RESET => {
273            let ts = u64::from_le_bytes(data.get(1..9)?.try_into().ok()?);
274            Some((Frame::TimestampReset(ts), 9))
275        }
276        TAG_SCHEMA_ANNOTATIONS => decode_schema_annotations_frame(data),
277        _ => None,
278    }
279}
280
281fn decode_schema_frame(data: &[u8]) -> Option<(Frame, usize)> {
282    let mut pos = 1; // skip tag
283    let type_id = WireTypeId(u16::from_le_bytes(data.get(pos..pos + 2)?.try_into().ok()?));
284    pos += 2;
285    let name_len = u16::from_le_bytes(data.get(pos..pos + 2)?.try_into().ok()?) as usize;
286    pos += 2;
287    let name = String::from_utf8(data.get(pos..pos + name_len)?.to_vec()).ok()?;
288    pos += name_len;
289    let has_timestamp = *data.get(pos)? != 0;
290    pos += 1;
291    let field_count = u16::from_le_bytes(data.get(pos..pos + 2)?.try_into().ok()?) as usize;
292    pos += 2;
293    let mut fields = Vec::with_capacity(field_count);
294    for _ in 0..field_count {
295        let fname_len = u16::from_le_bytes(data.get(pos..pos + 2)?.try_into().ok()?) as usize;
296        pos += 2;
297        let fname = String::from_utf8(data.get(pos..pos + fname_len)?.to_vec()).ok()?;
298        pos += fname_len;
299        let raw_tag = *data.get(pos)?;
300        let ft = FieldType::from_tag(raw_tag)?;
301        pos += 1;
302        fields.push(FieldDef {
303            name: fname,
304            field_type: ft,
305        });
306    }
307    Some((
308        Frame::Schema {
309            type_id,
310            entry: SchemaEntry {
311                name,
312                has_timestamp,
313                fields,
314                annotations: Vec::new(),
315            },
316        },
317        pos,
318    ))
319}
320
321fn decode_event_frame<'s>(
322    data: &[u8],
323    schema_lookup: impl Fn(WireTypeId) -> Option<SchemaInfo<'s>>,
324    timestamp_base_ns: u64,
325) -> Option<(Frame, usize)> {
326    let mut pos = 1; // skip tag
327    let type_id = WireTypeId(u16::from_le_bytes(data.get(pos..pos + 2)?.try_into().ok()?));
328    pos += 2;
329    let info = schema_lookup(type_id)?;
330
331    let timestamp_ns = if info.has_timestamp {
332        let delta = decode_u24_le(&data[pos..])?;
333        pos += 3;
334        Some(timestamp_base_ns.checked_add(delta as u64)?)
335    } else {
336        None
337    };
338
339    let mut values = Vec::with_capacity(info.field_tags.len());
340    let mut remaining = &data[pos..];
341    for &tag in info.field_tags {
342        let ft = FieldType::from_tag(tag)?;
343        if ft.is_optional() {
344            let prefix = *remaining.first()?;
345            remaining = &remaining[1..];
346            if prefix == 0x00 {
347                values.push(FieldValue::None);
348            } else {
349                let (val, rest) = FieldValue::decode(ft.inner(), remaining)?;
350                values.push(val);
351                remaining = rest;
352            }
353        } else {
354            let (val, rest) = FieldValue::decode(ft, remaining)?;
355            values.push(val);
356            remaining = rest;
357        }
358    }
359    let consumed = data.len() - remaining.len();
360    Some((
361        Frame::Event {
362            type_id,
363            timestamp_ns,
364            values,
365        },
366        consumed,
367    ))
368}
369
370fn decode_string_pool_frame(data: &[u8]) -> Option<(Frame, usize)> {
371    let mut pos = 1;
372    let count = u32::from_le_bytes(data.get(pos..pos + 4)?.try_into().ok()?) as usize;
373    pos += 4;
374    let mut entries = Vec::with_capacity(count.min((data.len() - pos) / 8));
375    for _ in 0..count {
376        let pool_id = u32::from_le_bytes(data.get(pos..pos + 4)?.try_into().ok()?);
377        pos += 4;
378        let len = u32::from_le_bytes(data.get(pos..pos + 4)?.try_into().ok()?) as usize;
379        pos += 4;
380        let d = data.get(pos..pos + len)?.to_vec();
381        pos += len;
382        entries.push(PoolEntry { pool_id, data: d });
383    }
384    Some((Frame::StringPool(entries), pos))
385}
386
387fn decode_stack_pool_frame(data: &[u8]) -> Option<(Frame, usize)> {
388    let mut pos = 1;
389    let count = u32::from_le_bytes(data.get(pos..pos + 4)?.try_into().ok()?) as usize;
390    pos += 4;
391    let mut entries = Vec::with_capacity(count.min((data.len() - pos) / 16));
392    for _ in 0..count {
393        let pool_id = u32::from_le_bytes(data.get(pos..pos + 4)?.try_into().ok()?);
394        pos += 4;
395        let frame_count = u32::from_le_bytes(data.get(pos..pos + 4)?.try_into().ok()?) as usize;
396        pos += 4;
397        let bytes = frame_count.checked_mul(8)?;
398        let mut frames = Vec::with_capacity(frame_count);
399        for i in 0..frame_count {
400            let off = pos + i * 8;
401            let addr = u64::from_le_bytes(data.get(off..off + 8)?.try_into().ok()?);
402            frames.push(addr);
403        }
404        pos += bytes;
405        entries.push(StackPoolEntry { pool_id, frames });
406    }
407    Some((Frame::StackPool(entries), pos))
408}
409
410fn decode_schema_annotations_frame(data: &[u8]) -> Option<(Frame, usize)> {
411    let mut pos = 1; // skip tag
412    let (type_id_raw, consumed) = crate::leb128::decode_unsigned(&data[pos..])?;
413    let type_id = WireTypeId(type_id_raw as u16);
414    pos += consumed;
415    let count = u16::from_le_bytes(data.get(pos..pos + 2)?.try_into().ok()?) as usize;
416    pos += 2;
417    let mut annotations = Vec::with_capacity(count);
418    for _ in 0..count {
419        let field_index = u16::from_le_bytes(data.get(pos..pos + 2)?.try_into().ok()?);
420        pos += 2;
421        let key_len = u16::from_le_bytes(data.get(pos..pos + 2)?.try_into().ok()?) as usize;
422        pos += 2;
423        let key = String::from_utf8(data.get(pos..pos + key_len)?.to_vec()).ok()?;
424        pos += key_len;
425        let value_len = u32::from_le_bytes(data.get(pos..pos + 4)?.try_into().ok()?) as usize;
426        pos += 4;
427        let value = String::from_utf8(data.get(pos..pos + value_len)?.to_vec()).ok()?;
428        pos += value_len;
429        annotations.push(FieldAnnotation::new(field_index, key, value));
430    }
431    Some((
432        Frame::SchemaAnnotations {
433            type_id,
434            annotations,
435        },
436        pos,
437    ))
438}
439
440// --- Zero-copy decoding ---
441
442/// Decode a single frame without allocating owned data for field values.
443pub(crate) fn decode_frame_ref<'a, 's>(
444    data: &'a [u8],
445    schema_lookup: impl Fn(WireTypeId) -> Option<SchemaInfo<'s>>,
446    timestamp_base_ns: u64,
447) -> Option<(FrameRef<'a>, usize)> {
448    let tag = *data.first()?;
449    match tag {
450        TAG_SCHEMA => {
451            let (frame, consumed) = decode_schema_frame(data)?;
452            match frame {
453                Frame::Schema { type_id, entry } => {
454                    Some((FrameRef::Schema { type_id, entry }, consumed))
455                }
456                _ => unreachable!(),
457            }
458        }
459        TAG_EVENT => decode_event_frame_ref(data, schema_lookup, timestamp_base_ns),
460        TAG_STRING_POOL => decode_string_pool_frame_ref(data),
461        TAG_STACK_POOL => decode_stack_pool_frame_ref(data),
462        TAG_TIMESTAMP_RESET => {
463            let ts = u64::from_le_bytes(data.get(1..9)?.try_into().ok()?);
464            Some((FrameRef::TimestampReset(ts), 9))
465        }
466        TAG_SCHEMA_ANNOTATIONS => {
467            let (frame, consumed) = decode_schema_annotations_frame(data)?;
468            match frame {
469                Frame::SchemaAnnotations {
470                    type_id,
471                    annotations,
472                } => Some((
473                    FrameRef::SchemaAnnotations {
474                        type_id,
475                        annotations,
476                    },
477                    consumed,
478                )),
479                _ => unreachable!(),
480            }
481        }
482        _ => None,
483    }
484}
485
486fn decode_event_frame_ref<'a, 's>(
487    data: &'a [u8],
488    schema_lookup: impl Fn(WireTypeId) -> Option<SchemaInfo<'s>>,
489    timestamp_base_ns: u64,
490) -> Option<(FrameRef<'a>, usize)> {
491    let mut pos = 1;
492    let type_id = WireTypeId(u16::from_le_bytes(data.get(pos..pos + 2)?.try_into().ok()?));
493    pos += 2;
494    let info = schema_lookup(type_id)?;
495
496    let timestamp_ns = if info.has_timestamp {
497        let delta = decode_u24_le(&data[pos..])?;
498        pos += 3;
499        Some(timestamp_base_ns.checked_add(delta as u64)?)
500    } else {
501        None
502    };
503
504    let mut values = Vec::with_capacity(info.field_tags.len());
505    for &tag in info.field_tags {
506        let ft = FieldType::from_tag(tag)?;
507        if ft.is_optional() {
508            let prefix = *data.get(pos)?;
509            pos += 1;
510            if prefix == 0x00 {
511                values.push(FieldValueRef::None);
512            } else {
513                let (val, consumed) = FieldValueRef::decode(ft.inner(), data, pos)?;
514                values.push(val);
515                pos += consumed;
516            }
517        } else {
518            let (val, consumed) = FieldValueRef::decode(ft, data, pos)?;
519            values.push(val);
520            pos += consumed;
521        }
522    }
523    Some((
524        FrameRef::Event {
525            type_id,
526            timestamp_ns,
527            values,
528        },
529        pos,
530    ))
531}
532
533fn decode_string_pool_frame_ref<'a>(data: &'a [u8]) -> Option<(FrameRef<'a>, usize)> {
534    let mut pos = 1;
535    let count = u32::from_le_bytes(data.get(pos..pos + 4)?.try_into().ok()?) as usize;
536    pos += 4;
537    let mut entries = Vec::with_capacity(count.min((data.len() - pos) / 8));
538    for _ in 0..count {
539        let pool_id = u32::from_le_bytes(data.get(pos..pos + 4)?.try_into().ok()?);
540        pos += 4;
541        let len = u32::from_le_bytes(data.get(pos..pos + 4)?.try_into().ok()?) as usize;
542        pos += 4;
543        let d = data.get(pos..pos + len)?;
544        pos += len;
545        entries.push(PoolEntryRef { pool_id, data: d });
546    }
547    Some((FrameRef::StringPool(entries), pos))
548}
549
550fn decode_stack_pool_frame_ref<'a>(data: &'a [u8]) -> Option<(FrameRef<'a>, usize)> {
551    let mut pos = 1;
552    let count = u32::from_le_bytes(data.get(pos..pos + 4)?.try_into().ok()?) as usize;
553    pos += 4;
554    let mut entries = Vec::with_capacity(count.min((data.len() - pos) / 16));
555    for _ in 0..count {
556        let pool_id = u32::from_le_bytes(data.get(pos..pos + 4)?.try_into().ok()?);
557        pos += 4;
558        let frame_count = u32::from_le_bytes(data.get(pos..pos + 4)?.try_into().ok()?) as usize;
559        pos += 4;
560        let bytes = frame_count.checked_mul(8)?;
561        let frames_le = data.get(pos..pos + bytes)?;
562        pos += bytes;
563        entries.push(StackPoolEntryRef { pool_id, frames_le });
564    }
565    Some((FrameRef::StackPool(entries), pos))
566}
567
568#[cfg(test)]
569mod tests {
570    use super::*;
571
572    // --- Header tests ---
573
574    #[test]
575    fn header_encode_decode() {
576        let mut buf = Vec::new();
577        encode_header(&mut buf).unwrap();
578        assert_eq!(buf, [0x54, 0x52, 0x43, 0x00, 1]);
579        assert_eq!(decode_header(&buf), Some(1));
580    }
581
582    #[test]
583    fn header_bad_magic() {
584        assert_eq!(decode_header(&[0x00, 0x00, 0x00, 0x00, 1]), None);
585    }
586
587    #[test]
588    fn header_too_short() {
589        assert_eq!(decode_header(&[0x54, 0x52]), None);
590    }
591
592    // --- Schema frame tests ---
593
594    #[test]
595    fn schema_frame_round_trip() {
596        let type_id = WireTypeId(1);
597        let entry = SchemaEntry {
598            name: "PollStart".into(),
599            has_timestamp: true,
600            fields: vec![FieldDef {
601                name: "worker".into(),
602                field_type: FieldType::Varint,
603            }],
604            annotations: Vec::new(),
605        };
606        let mut buf = Vec::new();
607        encode_schema(type_id, &entry, &mut buf).unwrap();
608        assert_eq!(buf[0], TAG_SCHEMA);
609        let (frame, consumed) = decode_frame(&buf, |_| None, 0).unwrap();
610        assert_eq!(consumed, buf.len());
611        assert_eq!(frame, Frame::Schema { type_id, entry });
612    }
613
614    #[test]
615    fn schema_frame_empty_fields() {
616        let type_id = WireTypeId(0);
617        let entry = SchemaEntry {
618            name: "Empty".into(),
619            has_timestamp: false,
620            fields: vec![],
621            annotations: Vec::new(),
622        };
623        let mut buf = Vec::new();
624        encode_schema(type_id, &entry, &mut buf).unwrap();
625        let (frame, _) = decode_frame(&buf, |_| None, 0).unwrap();
626        assert_eq!(frame, Frame::Schema { type_id, entry });
627    }
628
629    // --- Event frame tests ---
630
631    #[test]
632    fn event_frame_round_trip() {
633        let values = vec![
634            FieldValue::Varint(12345),
635            FieldValue::Bool(true),
636            FieldValue::String("hi".to_string()),
637        ];
638        let mut buf = Vec::new();
639        encode_event(WireTypeId(1), None, &values, &mut buf).unwrap();
640        assert_eq!(buf[0], TAG_EVENT);
641
642        let tags: Vec<u8> = vec![
643            FieldType::Varint as u8,
644            FieldType::Bool as u8,
645            FieldType::String as u8,
646        ];
647        let lookup = |id: WireTypeId| -> Option<SchemaInfo<'_>> {
648            if id == WireTypeId(1) {
649                Some(SchemaInfo {
650                    field_tags: &tags,
651                    has_timestamp: false,
652                })
653            } else {
654                None
655            }
656        };
657        let (frame, consumed) = decode_frame(&buf, lookup, 0).unwrap();
658        assert_eq!(consumed, buf.len());
659        assert_eq!(
660            frame,
661            Frame::Event {
662                type_id: WireTypeId(1),
663                timestamp_ns: None,
664                values
665            }
666        );
667    }
668
669    #[test]
670    fn event_frame_with_timestamp_round_trip() {
671        let values = vec![FieldValue::Varint(42)];
672        let mut buf = Vec::new();
673        encode_event(WireTypeId(1), Some(1_000_000), &values, &mut buf).unwrap();
674
675        let tags: Vec<u8> = vec![FieldType::Varint as u8];
676        let lookup = |id: WireTypeId| -> Option<SchemaInfo<'_>> {
677            if id == WireTypeId(1) {
678                Some(SchemaInfo {
679                    field_tags: &tags,
680                    has_timestamp: true,
681                })
682            } else {
683                None
684            }
685        };
686        let (frame, consumed) = decode_frame(&buf, lookup, 5_000_000).unwrap();
687        assert_eq!(consumed, buf.len());
688        assert_eq!(
689            frame,
690            Frame::Event {
691                type_id: WireTypeId(1),
692                timestamp_ns: Some(5_000_000 + 1_000_000),
693                values,
694            }
695        );
696    }
697
698    #[test]
699    fn event_frame_unknown_type_id() {
700        let mut buf = Vec::new();
701        encode_event(WireTypeId(99), None, &[FieldValue::Varint(1)], &mut buf).unwrap();
702        assert!(decode_frame(&buf, |_| None, 0).is_none());
703    }
704
705    #[test]
706    fn event_frame_varint_compact() {
707        let values = vec![FieldValue::Varint(1_050_000), FieldValue::Varint(3)];
708        let mut buf = Vec::new();
709        encode_event(WireTypeId(2), None, &values, &mut buf).unwrap();
710        assert!(
711            buf.len() <= 7,
712            "varint PollEnd should be <=7 bytes, got {}",
713            buf.len()
714        );
715
716        let tags: Vec<u8> = vec![FieldType::Varint as u8, FieldType::Varint as u8];
717        let lookup = |id: WireTypeId| -> Option<SchemaInfo<'_>> {
718            if id == WireTypeId(2) {
719                Some(SchemaInfo {
720                    field_tags: &tags,
721                    has_timestamp: false,
722                })
723            } else {
724                None
725            }
726        };
727        let (frame, consumed) = decode_frame(&buf, lookup, 0).unwrap();
728        assert_eq!(consumed, buf.len());
729        assert_eq!(
730            frame,
731            Frame::Event {
732                type_id: WireTypeId(2),
733                timestamp_ns: None,
734                values
735            }
736        );
737    }
738
739    // --- String pool frame tests ---
740
741    #[test]
742    fn string_pool_round_trip() {
743        let entries = vec![
744            PoolEntry {
745                pool_id: 0,
746                data: b"main_thread".to_vec(),
747            },
748            PoolEntry {
749                pool_id: 1,
750                data: b"worker-1".to_vec(),
751            },
752        ];
753        let mut buf = Vec::new();
754        encode_string_pool(&entries, &mut buf).unwrap();
755        assert_eq!(buf[0], TAG_STRING_POOL);
756        let (frame, consumed) = decode_frame(&buf, |_| None, 0).unwrap();
757        assert_eq!(consumed, buf.len());
758        assert_eq!(frame, Frame::StringPool(entries));
759    }
760
761    #[test]
762    fn string_pool_empty() {
763        let mut buf = Vec::new();
764        encode_string_pool(&[], &mut buf).unwrap();
765        let (frame, _) = decode_frame(&buf, |_| None, 0).unwrap();
766        assert_eq!(frame, Frame::StringPool(vec![]));
767    }
768
769    #[test]
770    fn unknown_tag_returns_none() {
771        assert!(decode_frame(&[0xFF], |_| None, 0).is_none());
772    }
773
774    #[test]
775    fn truncated_event_frame() {
776        let tags: Vec<u8> = vec![FieldType::Varint as u8];
777        let data = [TAG_EVENT, 0x01];
778        let result = decode_frame(
779            &data,
780            |_| {
781                Some(SchemaInfo {
782                    field_tags: &tags,
783                    has_timestamp: false,
784                })
785            },
786            0,
787        );
788        assert!(result.is_none());
789    }
790
791    #[test]
792    fn truncated_schema_frame() {
793        let data = [TAG_SCHEMA, 0x00, 0x00];
794        let result = decode_frame(&data, |_: WireTypeId| None, 0);
795        assert!(result.is_none());
796    }
797}