Skip to main content

dial9_trace_format/
codec.rs

1//! Wire-format encoding and decoding for trace frames.
2//!
3//! This module contains the low-level frame codec. Most users should use
4//! [`Encoder`](crate::encoder::Encoder) and [`Decoder`](crate::decoder::Decoder)
5//! instead. The types [`WireTypeId`], [`PoolEntry`], and [`PoolEntryRef`] are
6//! re-exported here because they appear in the decoder's public API.
7
8use crate::schema::{FieldDef, SchemaEntry};
9use crate::types::{FieldType, FieldValue, FieldValueRef};
10use std::io::{self, Write};
11
12/// Type ID as it appears on the wire (u16 in schema/event frame headers).
13/// Assigned sequentially by the encoder; the decoder reads them from the stream.
14///
15/// ## Note
16/// The wire type id is only stable within a single file. It is not static.
17#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
18pub struct WireTypeId(pub u16);
19
20pub(crate) const MAGIC: [u8; 4] = [0x54, 0x52, 0x43, 0x00]; // TRC\0
21pub(crate) const VERSION: u8 = 1;
22pub(crate) const HEADER_SIZE: usize = 5;
23
24pub(crate) const TAG_SCHEMA: u8 = 0x01;
25pub(crate) const TAG_EVENT: u8 = 0x02;
26pub(crate) const TAG_STRING_POOL: u8 = 0x03;
27pub(crate) const TAG_STACK_POOL: u8 = 0x04;
28pub(crate) const TAG_TIMESTAMP_RESET: u8 = 0x05;
29// Tag 0x06 is reserved (formerly ProcMaps, now schema-based events).
30
31/// Maximum nanosecond delta that fits in a u24 (3 bytes).
32pub(crate) const MAX_TIMESTAMP_DELTA_NS: u64 = 0xFF_FFFF; // 16,777,215
33
34/// Encode a u32 value as 3-byte little-endian (u24). Caller must ensure `value <= 0xFF_FFFF`.
35#[inline]
36pub(crate) fn encode_u24_le(value: u32, w: &mut impl Write) -> io::Result<()> {
37    debug_assert!(value <= MAX_TIMESTAMP_DELTA_NS as u32);
38    w.write_all(&[value as u8, (value >> 8) as u8, (value >> 16) as u8])
39}
40
41/// Decode a 3-byte little-endian u24 from `data`. Returns `None` if fewer than 3 bytes.
42#[inline]
43pub(crate) fn decode_u24_le(data: &[u8]) -> Option<u32> {
44    let b = data.get(..3)?;
45    Some(b[0] as u32 | (b[1] as u32) << 8 | (b[2] as u32) << 16)
46}
47
48/// An owned string pool entry.
49#[derive(Debug, Clone, PartialEq)]
50pub struct PoolEntry {
51    /// Pool ID assigned by the encoder.
52    pub pool_id: u32,
53    /// Raw string data.
54    pub data: Vec<u8>,
55}
56
57/// An owned stack-frame pool entry. Frames are leaf-first u64 addresses.
58#[derive(Debug, Clone, PartialEq)]
59pub struct StackPoolEntry {
60    /// Pool ID assigned by the encoder.
61    pub pool_id: u32,
62    /// Stack frame addresses (leaf-first).
63    pub frames: Vec<u64>,
64}
65
66#[derive(Debug, Clone, PartialEq)]
67pub(crate) enum Frame {
68    Schema {
69        type_id: WireTypeId,
70        entry: SchemaEntry,
71    },
72    Event {
73        type_id: WireTypeId,
74        /// Absolute timestamp in nanoseconds, if the schema has `has_timestamp`.
75        timestamp_ns: Option<u64>,
76        values: Vec<FieldValue>,
77    },
78    StringPool(Vec<PoolEntry>),
79    StackPool(Vec<StackPoolEntry>),
80    TimestampReset(u64),
81}
82
83/// Zero-copy pool entry borrowing from the input buffer.
84#[non_exhaustive]
85#[derive(Debug, Clone, PartialEq)]
86pub struct PoolEntryRef<'a> {
87    /// Pool ID assigned by the encoder.
88    pub pool_id: u32,
89    /// Raw string data borrowed from the decode buffer.
90    pub data: &'a [u8],
91}
92
93/// Zero-copy stack-frame pool entry borrowing from the input buffer.
94#[non_exhaustive]
95#[derive(Debug, Clone, PartialEq)]
96pub struct StackPoolEntryRef<'a> {
97    /// Pool ID assigned by the encoder.
98    pub pool_id: u32,
99    /// Raw u64-LE bytes borrowed from the decode buffer.
100    pub frames_le: &'a [u8],
101}
102
103impl<'a> StackPoolEntryRef<'a> {
104    /// Number of frames in the entry.
105    pub fn frame_count(&self) -> u32 {
106        (self.frames_le.len() / 8) as u32
107    }
108
109    /// Iterate the stack frame addresses.
110    pub fn iter(&self) -> crate::types::StackFrameIter<'a> {
111        crate::types::StackFrameIter::new(self.frames_le, self.frame_count())
112    }
113
114    /// Collect into an owned [`StackFrames`](crate::types::StackFrames).
115    pub fn to_stack_frames(&self) -> crate::types::StackFrames {
116        self.iter().collect()
117    }
118}
119
120/// Zero-copy frame that borrows from the input buffer.
121#[derive(Debug, Clone, PartialEq)]
122pub(crate) enum FrameRef<'a> {
123    Schema {
124        type_id: WireTypeId,
125        entry: SchemaEntry,
126    },
127    Event {
128        type_id: WireTypeId,
129        timestamp_ns: Option<u64>,
130        values: Vec<FieldValueRef<'a>>,
131    },
132    StringPool(Vec<PoolEntryRef<'a>>),
133    StackPool(Vec<StackPoolEntryRef<'a>>),
134    TimestampReset(u64),
135}
136
137/// Schema info needed by the decoder: raw field type tags + has_timestamp flag.
138/// Raw tags preserve the optional bit (0x80) for correct decode handling.
139pub(crate) struct SchemaInfo<'a> {
140    pub field_tags: &'a [u8],
141    pub has_timestamp: bool,
142}
143
144// --- Encoding ---
145
146pub(crate) fn encode_header(w: &mut impl Write) -> io::Result<()> {
147    w.write_all(&MAGIC)?;
148    w.write_all(&[VERSION])
149}
150
151pub(crate) fn encode_schema(
152    type_id: WireTypeId,
153    entry: &SchemaEntry,
154    w: &mut impl Write,
155) -> io::Result<()> {
156    w.write_all(&[TAG_SCHEMA])?;
157    w.write_all(&type_id.0.to_le_bytes())?;
158    let name_bytes = entry.name.as_bytes();
159    w.write_all(&(name_bytes.len() as u16).to_le_bytes())?;
160    w.write_all(name_bytes)?;
161    w.write_all(&[if entry.has_timestamp { 1 } else { 0 }])?;
162    w.write_all(&(entry.fields.len() as u16).to_le_bytes())?;
163    for f in &entry.fields {
164        let fname = f.name.as_bytes();
165        w.write_all(&(fname.len() as u16).to_le_bytes())?;
166        w.write_all(fname)?;
167        w.write_all(&[f.field_type as u8])?;
168    }
169    Ok(())
170}
171
172/// Encode an event frame. If `timestamp_delta_ns` is Some, writes a u24 LE delta
173/// after the type_id (for schemas with `has_timestamp = true`).
174#[cfg(test)]
175pub(crate) fn encode_event(
176    type_id: WireTypeId,
177    timestamp_delta_ns: Option<u32>,
178    values: &[FieldValue],
179    w: &mut impl Write,
180) -> io::Result<()> {
181    w.write_all(&[TAG_EVENT])?;
182    w.write_all(&type_id.0.to_le_bytes())?;
183    if let Some(delta) = timestamp_delta_ns {
184        encode_u24_le(delta, w)?;
185    }
186    for v in values {
187        v.encode(w)?;
188    }
189    Ok(())
190}
191
192pub(crate) fn encode_string_pool(entries: &[PoolEntry], w: &mut impl Write) -> io::Result<()> {
193    w.write_all(&[TAG_STRING_POOL])?;
194    w.write_all(&(entries.len() as u32).to_le_bytes())?;
195    for e in entries {
196        w.write_all(&e.pool_id.to_le_bytes())?;
197        w.write_all(&(e.data.len() as u32).to_le_bytes())?;
198        w.write_all(&e.data)?;
199    }
200    Ok(())
201}
202
203pub(crate) fn encode_stack_pool(entries: &[StackPoolEntry], w: &mut impl Write) -> io::Result<()> {
204    w.write_all(&[TAG_STACK_POOL])?;
205    w.write_all(&(entries.len() as u32).to_le_bytes())?;
206    for e in entries {
207        w.write_all(&e.pool_id.to_le_bytes())?;
208        w.write_all(&(e.frames.len() as u32).to_le_bytes())?;
209        for &addr in &e.frames {
210            w.write_all(&addr.to_le_bytes())?;
211        }
212    }
213    Ok(())
214}
215
216// --- Decoding ---
217
218pub(crate) fn decode_header(data: &[u8]) -> Option<u8> {
219    if data.get(..4)? != MAGIC {
220        return None;
221    }
222    let version = *data.get(4)?;
223    Some(version)
224}
225
226/// Decode a single frame starting at `data`. Returns (Frame, bytes_consumed).
227pub(crate) fn decode_frame<'s>(
228    data: &[u8],
229    schema_lookup: impl Fn(WireTypeId) -> Option<SchemaInfo<'s>>,
230    timestamp_base_ns: u64,
231) -> Option<(Frame, usize)> {
232    let tag = *data.first()?;
233    match tag {
234        TAG_SCHEMA => decode_schema_frame(data),
235        TAG_EVENT => decode_event_frame(data, schema_lookup, timestamp_base_ns),
236        TAG_STRING_POOL => decode_string_pool_frame(data),
237        TAG_STACK_POOL => decode_stack_pool_frame(data),
238        TAG_TIMESTAMP_RESET => {
239            let ts = u64::from_le_bytes(data.get(1..9)?.try_into().ok()?);
240            Some((Frame::TimestampReset(ts), 9))
241        }
242        _ => None,
243    }
244}
245
246fn decode_schema_frame(data: &[u8]) -> Option<(Frame, usize)> {
247    let mut pos = 1; // skip tag
248    let type_id = WireTypeId(u16::from_le_bytes(data.get(pos..pos + 2)?.try_into().ok()?));
249    pos += 2;
250    let name_len = u16::from_le_bytes(data.get(pos..pos + 2)?.try_into().ok()?) as usize;
251    pos += 2;
252    let name = String::from_utf8(data.get(pos..pos + name_len)?.to_vec()).ok()?;
253    pos += name_len;
254    let has_timestamp = *data.get(pos)? != 0;
255    pos += 1;
256    let field_count = u16::from_le_bytes(data.get(pos..pos + 2)?.try_into().ok()?) as usize;
257    pos += 2;
258    let mut fields = Vec::with_capacity(field_count);
259    for _ in 0..field_count {
260        let fname_len = u16::from_le_bytes(data.get(pos..pos + 2)?.try_into().ok()?) as usize;
261        pos += 2;
262        let fname = String::from_utf8(data.get(pos..pos + fname_len)?.to_vec()).ok()?;
263        pos += fname_len;
264        let raw_tag = *data.get(pos)?;
265        let ft = FieldType::from_tag(raw_tag)?;
266        pos += 1;
267        fields.push(FieldDef {
268            name: fname,
269            field_type: ft,
270        });
271    }
272    Some((
273        Frame::Schema {
274            type_id,
275            entry: SchemaEntry {
276                name,
277                has_timestamp,
278                fields,
279            },
280        },
281        pos,
282    ))
283}
284
285fn decode_event_frame<'s>(
286    data: &[u8],
287    schema_lookup: impl Fn(WireTypeId) -> Option<SchemaInfo<'s>>,
288    timestamp_base_ns: u64,
289) -> Option<(Frame, usize)> {
290    let mut pos = 1; // skip tag
291    let type_id = WireTypeId(u16::from_le_bytes(data.get(pos..pos + 2)?.try_into().ok()?));
292    pos += 2;
293    let info = schema_lookup(type_id)?;
294
295    let timestamp_ns = if info.has_timestamp {
296        let delta = decode_u24_le(&data[pos..])?;
297        pos += 3;
298        Some(timestamp_base_ns.checked_add(delta as u64)?)
299    } else {
300        None
301    };
302
303    let mut values = Vec::with_capacity(info.field_tags.len());
304    let mut remaining = &data[pos..];
305    for &tag in info.field_tags {
306        let ft = FieldType::from_tag(tag)?;
307        if ft.is_optional() {
308            let prefix = *remaining.first()?;
309            remaining = &remaining[1..];
310            if prefix == 0x00 {
311                values.push(FieldValue::None);
312            } else {
313                let (val, rest) = FieldValue::decode(ft.inner(), remaining)?;
314                values.push(val);
315                remaining = rest;
316            }
317        } else {
318            let (val, rest) = FieldValue::decode(ft, remaining)?;
319            values.push(val);
320            remaining = rest;
321        }
322    }
323    let consumed = data.len() - remaining.len();
324    Some((
325        Frame::Event {
326            type_id,
327            timestamp_ns,
328            values,
329        },
330        consumed,
331    ))
332}
333
334fn decode_string_pool_frame(data: &[u8]) -> Option<(Frame, usize)> {
335    let mut pos = 1;
336    let count = u32::from_le_bytes(data.get(pos..pos + 4)?.try_into().ok()?) as usize;
337    pos += 4;
338    let mut entries = Vec::with_capacity(count.min((data.len() - pos) / 8));
339    for _ in 0..count {
340        let pool_id = u32::from_le_bytes(data.get(pos..pos + 4)?.try_into().ok()?);
341        pos += 4;
342        let len = u32::from_le_bytes(data.get(pos..pos + 4)?.try_into().ok()?) as usize;
343        pos += 4;
344        let d = data.get(pos..pos + len)?.to_vec();
345        pos += len;
346        entries.push(PoolEntry { pool_id, data: d });
347    }
348    Some((Frame::StringPool(entries), pos))
349}
350
351fn decode_stack_pool_frame(data: &[u8]) -> Option<(Frame, usize)> {
352    let mut pos = 1;
353    let count = u32::from_le_bytes(data.get(pos..pos + 4)?.try_into().ok()?) as usize;
354    pos += 4;
355    let mut entries = Vec::with_capacity(count.min((data.len() - pos) / 16));
356    for _ in 0..count {
357        let pool_id = u32::from_le_bytes(data.get(pos..pos + 4)?.try_into().ok()?);
358        pos += 4;
359        let frame_count = u32::from_le_bytes(data.get(pos..pos + 4)?.try_into().ok()?) as usize;
360        pos += 4;
361        let bytes = frame_count.checked_mul(8)?;
362        let mut frames = Vec::with_capacity(frame_count);
363        for i in 0..frame_count {
364            let off = pos + i * 8;
365            let addr = u64::from_le_bytes(data.get(off..off + 8)?.try_into().ok()?);
366            frames.push(addr);
367        }
368        pos += bytes;
369        entries.push(StackPoolEntry { pool_id, frames });
370    }
371    Some((Frame::StackPool(entries), pos))
372}
373
374// --- Zero-copy decoding ---
375
376/// Decode a single frame without allocating owned data for field values.
377pub(crate) fn decode_frame_ref<'a, 's>(
378    data: &'a [u8],
379    schema_lookup: impl Fn(WireTypeId) -> Option<SchemaInfo<'s>>,
380    timestamp_base_ns: u64,
381) -> Option<(FrameRef<'a>, usize)> {
382    let tag = *data.first()?;
383    match tag {
384        TAG_SCHEMA => {
385            let (frame, consumed) = decode_schema_frame(data)?;
386            match frame {
387                Frame::Schema { type_id, entry } => {
388                    Some((FrameRef::Schema { type_id, entry }, consumed))
389                }
390                _ => unreachable!(),
391            }
392        }
393        TAG_EVENT => decode_event_frame_ref(data, schema_lookup, timestamp_base_ns),
394        TAG_STRING_POOL => decode_string_pool_frame_ref(data),
395        TAG_STACK_POOL => decode_stack_pool_frame_ref(data),
396        TAG_TIMESTAMP_RESET => {
397            let ts = u64::from_le_bytes(data.get(1..9)?.try_into().ok()?);
398            Some((FrameRef::TimestampReset(ts), 9))
399        }
400        _ => None,
401    }
402}
403
404fn decode_event_frame_ref<'a, 's>(
405    data: &'a [u8],
406    schema_lookup: impl Fn(WireTypeId) -> Option<SchemaInfo<'s>>,
407    timestamp_base_ns: u64,
408) -> Option<(FrameRef<'a>, usize)> {
409    let mut pos = 1;
410    let type_id = WireTypeId(u16::from_le_bytes(data.get(pos..pos + 2)?.try_into().ok()?));
411    pos += 2;
412    let info = schema_lookup(type_id)?;
413
414    let timestamp_ns = if info.has_timestamp {
415        let delta = decode_u24_le(&data[pos..])?;
416        pos += 3;
417        Some(timestamp_base_ns.checked_add(delta as u64)?)
418    } else {
419        None
420    };
421
422    let mut values = Vec::with_capacity(info.field_tags.len());
423    for &tag in info.field_tags {
424        let ft = FieldType::from_tag(tag)?;
425        if ft.is_optional() {
426            let prefix = *data.get(pos)?;
427            pos += 1;
428            if prefix == 0x00 {
429                values.push(FieldValueRef::None);
430            } else {
431                let (val, consumed) = FieldValueRef::decode(ft.inner(), data, pos)?;
432                values.push(val);
433                pos += consumed;
434            }
435        } else {
436            let (val, consumed) = FieldValueRef::decode(ft, data, pos)?;
437            values.push(val);
438            pos += consumed;
439        }
440    }
441    Some((
442        FrameRef::Event {
443            type_id,
444            timestamp_ns,
445            values,
446        },
447        pos,
448    ))
449}
450
451fn decode_string_pool_frame_ref<'a>(data: &'a [u8]) -> Option<(FrameRef<'a>, usize)> {
452    let mut pos = 1;
453    let count = u32::from_le_bytes(data.get(pos..pos + 4)?.try_into().ok()?) as usize;
454    pos += 4;
455    let mut entries = Vec::with_capacity(count.min((data.len() - pos) / 8));
456    for _ in 0..count {
457        let pool_id = u32::from_le_bytes(data.get(pos..pos + 4)?.try_into().ok()?);
458        pos += 4;
459        let len = u32::from_le_bytes(data.get(pos..pos + 4)?.try_into().ok()?) as usize;
460        pos += 4;
461        let d = data.get(pos..pos + len)?;
462        pos += len;
463        entries.push(PoolEntryRef { pool_id, data: d });
464    }
465    Some((FrameRef::StringPool(entries), pos))
466}
467
468fn decode_stack_pool_frame_ref<'a>(data: &'a [u8]) -> Option<(FrameRef<'a>, usize)> {
469    let mut pos = 1;
470    let count = u32::from_le_bytes(data.get(pos..pos + 4)?.try_into().ok()?) as usize;
471    pos += 4;
472    let mut entries = Vec::with_capacity(count.min((data.len() - pos) / 16));
473    for _ in 0..count {
474        let pool_id = u32::from_le_bytes(data.get(pos..pos + 4)?.try_into().ok()?);
475        pos += 4;
476        let frame_count = u32::from_le_bytes(data.get(pos..pos + 4)?.try_into().ok()?) as usize;
477        pos += 4;
478        let bytes = frame_count.checked_mul(8)?;
479        let frames_le = data.get(pos..pos + bytes)?;
480        pos += bytes;
481        entries.push(StackPoolEntryRef { pool_id, frames_le });
482    }
483    Some((FrameRef::StackPool(entries), pos))
484}
485
486#[cfg(test)]
487mod tests {
488    use super::*;
489
490    // --- Header tests ---
491
492    #[test]
493    fn header_encode_decode() {
494        let mut buf = Vec::new();
495        encode_header(&mut buf).unwrap();
496        assert_eq!(buf, [0x54, 0x52, 0x43, 0x00, 1]);
497        assert_eq!(decode_header(&buf), Some(1));
498    }
499
500    #[test]
501    fn header_bad_magic() {
502        assert_eq!(decode_header(&[0x00, 0x00, 0x00, 0x00, 1]), None);
503    }
504
505    #[test]
506    fn header_too_short() {
507        assert_eq!(decode_header(&[0x54, 0x52]), None);
508    }
509
510    // --- Schema frame tests ---
511
512    #[test]
513    fn schema_frame_round_trip() {
514        let type_id = WireTypeId(1);
515        let entry = SchemaEntry {
516            name: "PollStart".into(),
517            has_timestamp: true,
518            fields: vec![FieldDef {
519                name: "worker".into(),
520                field_type: FieldType::Varint,
521            }],
522        };
523        let mut buf = Vec::new();
524        encode_schema(type_id, &entry, &mut buf).unwrap();
525        assert_eq!(buf[0], TAG_SCHEMA);
526        let (frame, consumed) = decode_frame(&buf, |_| None, 0).unwrap();
527        assert_eq!(consumed, buf.len());
528        assert_eq!(frame, Frame::Schema { type_id, entry });
529    }
530
531    #[test]
532    fn schema_frame_empty_fields() {
533        let type_id = WireTypeId(0);
534        let entry = SchemaEntry {
535            name: "Empty".into(),
536            has_timestamp: false,
537            fields: vec![],
538        };
539        let mut buf = Vec::new();
540        encode_schema(type_id, &entry, &mut buf).unwrap();
541        let (frame, _) = decode_frame(&buf, |_| None, 0).unwrap();
542        assert_eq!(frame, Frame::Schema { type_id, entry });
543    }
544
545    // --- Event frame tests ---
546
547    #[test]
548    fn event_frame_round_trip() {
549        let values = vec![
550            FieldValue::Varint(12345),
551            FieldValue::Bool(true),
552            FieldValue::String("hi".to_string()),
553        ];
554        let mut buf = Vec::new();
555        encode_event(WireTypeId(1), None, &values, &mut buf).unwrap();
556        assert_eq!(buf[0], TAG_EVENT);
557
558        let tags: Vec<u8> = vec![
559            FieldType::Varint as u8,
560            FieldType::Bool as u8,
561            FieldType::String as u8,
562        ];
563        let lookup = |id: WireTypeId| -> Option<SchemaInfo<'_>> {
564            if id == WireTypeId(1) {
565                Some(SchemaInfo {
566                    field_tags: &tags,
567                    has_timestamp: false,
568                })
569            } else {
570                None
571            }
572        };
573        let (frame, consumed) = decode_frame(&buf, lookup, 0).unwrap();
574        assert_eq!(consumed, buf.len());
575        assert_eq!(
576            frame,
577            Frame::Event {
578                type_id: WireTypeId(1),
579                timestamp_ns: None,
580                values
581            }
582        );
583    }
584
585    #[test]
586    fn event_frame_with_timestamp_round_trip() {
587        let values = vec![FieldValue::Varint(42)];
588        let mut buf = Vec::new();
589        encode_event(WireTypeId(1), Some(1_000_000), &values, &mut buf).unwrap();
590
591        let tags: Vec<u8> = vec![FieldType::Varint as u8];
592        let lookup = |id: WireTypeId| -> Option<SchemaInfo<'_>> {
593            if id == WireTypeId(1) {
594                Some(SchemaInfo {
595                    field_tags: &tags,
596                    has_timestamp: true,
597                })
598            } else {
599                None
600            }
601        };
602        let (frame, consumed) = decode_frame(&buf, lookup, 5_000_000).unwrap();
603        assert_eq!(consumed, buf.len());
604        assert_eq!(
605            frame,
606            Frame::Event {
607                type_id: WireTypeId(1),
608                timestamp_ns: Some(5_000_000 + 1_000_000),
609                values,
610            }
611        );
612    }
613
614    #[test]
615    fn event_frame_unknown_type_id() {
616        let mut buf = Vec::new();
617        encode_event(WireTypeId(99), None, &[FieldValue::Varint(1)], &mut buf).unwrap();
618        assert!(decode_frame(&buf, |_| None, 0).is_none());
619    }
620
621    #[test]
622    fn event_frame_varint_compact() {
623        let values = vec![FieldValue::Varint(1_050_000), FieldValue::Varint(3)];
624        let mut buf = Vec::new();
625        encode_event(WireTypeId(2), None, &values, &mut buf).unwrap();
626        assert!(
627            buf.len() <= 7,
628            "varint PollEnd should be <=7 bytes, got {}",
629            buf.len()
630        );
631
632        let tags: Vec<u8> = vec![FieldType::Varint as u8, FieldType::Varint as u8];
633        let lookup = |id: WireTypeId| -> Option<SchemaInfo<'_>> {
634            if id == WireTypeId(2) {
635                Some(SchemaInfo {
636                    field_tags: &tags,
637                    has_timestamp: false,
638                })
639            } else {
640                None
641            }
642        };
643        let (frame, consumed) = decode_frame(&buf, lookup, 0).unwrap();
644        assert_eq!(consumed, buf.len());
645        assert_eq!(
646            frame,
647            Frame::Event {
648                type_id: WireTypeId(2),
649                timestamp_ns: None,
650                values
651            }
652        );
653    }
654
655    // --- String pool frame tests ---
656
657    #[test]
658    fn string_pool_round_trip() {
659        let entries = vec![
660            PoolEntry {
661                pool_id: 0,
662                data: b"main_thread".to_vec(),
663            },
664            PoolEntry {
665                pool_id: 1,
666                data: b"worker-1".to_vec(),
667            },
668        ];
669        let mut buf = Vec::new();
670        encode_string_pool(&entries, &mut buf).unwrap();
671        assert_eq!(buf[0], TAG_STRING_POOL);
672        let (frame, consumed) = decode_frame(&buf, |_| None, 0).unwrap();
673        assert_eq!(consumed, buf.len());
674        assert_eq!(frame, Frame::StringPool(entries));
675    }
676
677    #[test]
678    fn string_pool_empty() {
679        let mut buf = Vec::new();
680        encode_string_pool(&[], &mut buf).unwrap();
681        let (frame, _) = decode_frame(&buf, |_| None, 0).unwrap();
682        assert_eq!(frame, Frame::StringPool(vec![]));
683    }
684
685    #[test]
686    fn unknown_tag_returns_none() {
687        assert!(decode_frame(&[0xFF], |_| None, 0).is_none());
688    }
689
690    #[test]
691    fn truncated_event_frame() {
692        let tags: Vec<u8> = vec![FieldType::Varint as u8];
693        let data = [TAG_EVENT, 0x01];
694        let result = decode_frame(
695            &data,
696            |_| {
697                Some(SchemaInfo {
698                    field_tags: &tags,
699                    has_timestamp: false,
700                })
701            },
702            0,
703        );
704        assert!(result.is_none());
705    }
706
707    #[test]
708    fn truncated_schema_frame() {
709        let data = [TAG_SCHEMA, 0x00, 0x00];
710        let result = decode_frame(&data, |_: WireTypeId| None, 0);
711        assert!(result.is_none());
712    }
713}