Skip to main content

dial9_trace_format/
codec.rs

1//! Wire-format encoding and decoding for trace frames.
2//!
3//! This module contains the low-level frame codec. Most users should use
4//! [`Encoder`](crate::encoder::Encoder) and [`Decoder`](crate::decoder::Decoder)
5//! instead. The types [`WireTypeId`], [`PoolEntry`], and [`PoolEntryRef`] are
6//! re-exported here because they appear in the decoder's public API.
7
8use crate::schema::{FieldDef, SchemaEntry};
9use crate::types::{FieldType, FieldValue, FieldValueRef};
10use std::io::{self, Write};
11
12/// Type ID as it appears on the wire (u16 in schema/event frame headers).
13/// Assigned sequentially by the encoder; the decoder reads them from the stream.
14///
15/// ## Note
16/// The wire type id is only stable within a single file. It is not static.
17#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
18pub struct WireTypeId(pub u16);
19
20pub(crate) const MAGIC: [u8; 4] = [0x54, 0x52, 0x43, 0x00]; // TRC\0
21pub(crate) const VERSION: u8 = 1;
22pub(crate) const HEADER_SIZE: usize = 5;
23
24pub(crate) const TAG_SCHEMA: u8 = 0x01;
25pub(crate) const TAG_EVENT: u8 = 0x02;
26pub(crate) const TAG_STRING_POOL: u8 = 0x03;
27// Tags 0x04 and 0x06 are reserved (formerly SymbolTable and ProcMaps, now schema-based events).
28pub(crate) const TAG_TIMESTAMP_RESET: u8 = 0x05;
29
30/// Maximum nanosecond delta that fits in a u24 (3 bytes).
31pub(crate) const MAX_TIMESTAMP_DELTA_NS: u64 = 0xFF_FFFF; // 16,777,215
32
33/// Encode a u32 value as 3-byte little-endian (u24). Caller must ensure `value <= 0xFF_FFFF`.
34#[inline]
35pub(crate) fn encode_u24_le(value: u32, w: &mut impl Write) -> io::Result<()> {
36    debug_assert!(value <= MAX_TIMESTAMP_DELTA_NS as u32);
37    w.write_all(&[value as u8, (value >> 8) as u8, (value >> 16) as u8])
38}
39
40/// Decode a 3-byte little-endian u24 from `data`. Returns `None` if fewer than 3 bytes.
41#[inline]
42pub(crate) fn decode_u24_le(data: &[u8]) -> Option<u32> {
43    let b = data.get(..3)?;
44    Some(b[0] as u32 | (b[1] as u32) << 8 | (b[2] as u32) << 16)
45}
46
47/// An owned string pool entry.
48#[derive(Debug, Clone, PartialEq)]
49pub struct PoolEntry {
50    /// Pool ID assigned by the encoder.
51    pub pool_id: u32,
52    /// Raw string data.
53    pub data: Vec<u8>,
54}
55
56#[derive(Debug, Clone, PartialEq)]
57pub(crate) enum Frame {
58    Schema {
59        type_id: WireTypeId,
60        entry: SchemaEntry,
61    },
62    Event {
63        type_id: WireTypeId,
64        /// Absolute timestamp in nanoseconds, if the schema has `has_timestamp`.
65        timestamp_ns: Option<u64>,
66        values: Vec<FieldValue>,
67    },
68    StringPool(Vec<PoolEntry>),
69    TimestampReset(u64),
70}
71
72/// Zero-copy pool entry borrowing from the input buffer.
73#[non_exhaustive]
74#[derive(Debug, Clone, PartialEq)]
75pub struct PoolEntryRef<'a> {
76    /// Pool ID assigned by the encoder.
77    pub pool_id: u32,
78    /// Raw string data borrowed from the decode buffer.
79    pub data: &'a [u8],
80}
81
82/// Zero-copy frame that borrows from the input buffer.
83#[derive(Debug, Clone, PartialEq)]
84pub(crate) enum FrameRef<'a> {
85    Schema {
86        type_id: WireTypeId,
87        entry: SchemaEntry,
88    },
89    Event {
90        type_id: WireTypeId,
91        timestamp_ns: Option<u64>,
92        values: Vec<FieldValueRef<'a>>,
93    },
94    StringPool(Vec<PoolEntryRef<'a>>),
95    TimestampReset(u64),
96}
97
98/// Schema info needed by the decoder: raw field type tags + has_timestamp flag.
99/// Raw tags preserve the optional bit (0x80) for correct decode handling.
100pub(crate) struct SchemaInfo<'a> {
101    pub field_tags: &'a [u8],
102    pub has_timestamp: bool,
103}
104
105// --- Encoding ---
106
107pub(crate) fn encode_header(w: &mut impl Write) -> io::Result<()> {
108    w.write_all(&MAGIC)?;
109    w.write_all(&[VERSION])
110}
111
112pub(crate) fn encode_schema(
113    type_id: WireTypeId,
114    entry: &SchemaEntry,
115    w: &mut impl Write,
116) -> io::Result<()> {
117    w.write_all(&[TAG_SCHEMA])?;
118    w.write_all(&type_id.0.to_le_bytes())?;
119    let name_bytes = entry.name.as_bytes();
120    w.write_all(&(name_bytes.len() as u16).to_le_bytes())?;
121    w.write_all(name_bytes)?;
122    w.write_all(&[if entry.has_timestamp { 1 } else { 0 }])?;
123    w.write_all(&(entry.fields.len() as u16).to_le_bytes())?;
124    for f in &entry.fields {
125        let fname = f.name.as_bytes();
126        w.write_all(&(fname.len() as u16).to_le_bytes())?;
127        w.write_all(fname)?;
128        w.write_all(&[f.field_type as u8])?;
129    }
130    Ok(())
131}
132
133/// Encode an event frame. If `timestamp_delta_ns` is Some, writes a u24 LE delta
134/// after the type_id (for schemas with `has_timestamp = true`).
135#[cfg(test)]
136pub(crate) fn encode_event(
137    type_id: WireTypeId,
138    timestamp_delta_ns: Option<u32>,
139    values: &[FieldValue],
140    w: &mut impl Write,
141) -> io::Result<()> {
142    w.write_all(&[TAG_EVENT])?;
143    w.write_all(&type_id.0.to_le_bytes())?;
144    if let Some(delta) = timestamp_delta_ns {
145        encode_u24_le(delta, w)?;
146    }
147    for v in values {
148        v.encode(w)?;
149    }
150    Ok(())
151}
152
153pub(crate) fn encode_string_pool(entries: &[PoolEntry], w: &mut impl Write) -> io::Result<()> {
154    w.write_all(&[TAG_STRING_POOL])?;
155    w.write_all(&(entries.len() as u32).to_le_bytes())?;
156    for e in entries {
157        w.write_all(&e.pool_id.to_le_bytes())?;
158        w.write_all(&(e.data.len() as u32).to_le_bytes())?;
159        w.write_all(&e.data)?;
160    }
161    Ok(())
162}
163
164// --- Decoding ---
165
166pub(crate) fn decode_header(data: &[u8]) -> Option<u8> {
167    if data.get(..4)? != MAGIC {
168        return None;
169    }
170    let version = *data.get(4)?;
171    Some(version)
172}
173
174/// Decode a single frame starting at `data`. Returns (Frame, bytes_consumed).
175pub(crate) fn decode_frame<'s>(
176    data: &[u8],
177    schema_lookup: impl Fn(WireTypeId) -> Option<SchemaInfo<'s>>,
178    timestamp_base_ns: u64,
179) -> Option<(Frame, usize)> {
180    let tag = *data.first()?;
181    match tag {
182        TAG_SCHEMA => decode_schema_frame(data),
183        TAG_EVENT => decode_event_frame(data, schema_lookup, timestamp_base_ns),
184        TAG_STRING_POOL => decode_string_pool_frame(data),
185        TAG_TIMESTAMP_RESET => {
186            let ts = u64::from_le_bytes(data.get(1..9)?.try_into().ok()?);
187            Some((Frame::TimestampReset(ts), 9))
188        }
189        _ => None,
190    }
191}
192
193fn decode_schema_frame(data: &[u8]) -> Option<(Frame, usize)> {
194    let mut pos = 1; // skip tag
195    let type_id = WireTypeId(u16::from_le_bytes(data.get(pos..pos + 2)?.try_into().ok()?));
196    pos += 2;
197    let name_len = u16::from_le_bytes(data.get(pos..pos + 2)?.try_into().ok()?) as usize;
198    pos += 2;
199    let name = String::from_utf8(data.get(pos..pos + name_len)?.to_vec()).ok()?;
200    pos += name_len;
201    let has_timestamp = *data.get(pos)? != 0;
202    pos += 1;
203    let field_count = u16::from_le_bytes(data.get(pos..pos + 2)?.try_into().ok()?) as usize;
204    pos += 2;
205    let mut fields = Vec::with_capacity(field_count);
206    for _ in 0..field_count {
207        let fname_len = u16::from_le_bytes(data.get(pos..pos + 2)?.try_into().ok()?) as usize;
208        pos += 2;
209        let fname = String::from_utf8(data.get(pos..pos + fname_len)?.to_vec()).ok()?;
210        pos += fname_len;
211        let raw_tag = *data.get(pos)?;
212        let ft = FieldType::from_tag(raw_tag)?;
213        pos += 1;
214        fields.push(FieldDef {
215            name: fname,
216            field_type: ft,
217        });
218    }
219    Some((
220        Frame::Schema {
221            type_id,
222            entry: SchemaEntry {
223                name,
224                has_timestamp,
225                fields,
226            },
227        },
228        pos,
229    ))
230}
231
232fn decode_event_frame<'s>(
233    data: &[u8],
234    schema_lookup: impl Fn(WireTypeId) -> Option<SchemaInfo<'s>>,
235    timestamp_base_ns: u64,
236) -> Option<(Frame, usize)> {
237    let mut pos = 1; // skip tag
238    let type_id = WireTypeId(u16::from_le_bytes(data.get(pos..pos + 2)?.try_into().ok()?));
239    pos += 2;
240    let info = schema_lookup(type_id)?;
241
242    let timestamp_ns = if info.has_timestamp {
243        let delta = decode_u24_le(&data[pos..])?;
244        pos += 3;
245        Some(timestamp_base_ns.checked_add(delta as u64)?)
246    } else {
247        None
248    };
249
250    let mut values = Vec::with_capacity(info.field_tags.len());
251    let mut remaining = &data[pos..];
252    for &tag in info.field_tags {
253        let ft = FieldType::from_tag(tag)?;
254        if ft.is_optional() {
255            let prefix = *remaining.first()?;
256            remaining = &remaining[1..];
257            if prefix == 0x00 {
258                values.push(FieldValue::None);
259            } else {
260                let (val, rest) = FieldValue::decode(ft.inner(), remaining)?;
261                values.push(val);
262                remaining = rest;
263            }
264        } else {
265            let (val, rest) = FieldValue::decode(ft, remaining)?;
266            values.push(val);
267            remaining = rest;
268        }
269    }
270    let consumed = data.len() - remaining.len();
271    Some((
272        Frame::Event {
273            type_id,
274            timestamp_ns,
275            values,
276        },
277        consumed,
278    ))
279}
280
281fn decode_string_pool_frame(data: &[u8]) -> Option<(Frame, usize)> {
282    let mut pos = 1;
283    let count = u32::from_le_bytes(data.get(pos..pos + 4)?.try_into().ok()?) as usize;
284    pos += 4;
285    let mut entries = Vec::with_capacity(count.min((data.len() - pos) / 8));
286    for _ in 0..count {
287        let pool_id = u32::from_le_bytes(data.get(pos..pos + 4)?.try_into().ok()?);
288        pos += 4;
289        let len = u32::from_le_bytes(data.get(pos..pos + 4)?.try_into().ok()?) as usize;
290        pos += 4;
291        let d = data.get(pos..pos + len)?.to_vec();
292        pos += len;
293        entries.push(PoolEntry { pool_id, data: d });
294    }
295    Some((Frame::StringPool(entries), pos))
296}
297
298// --- Zero-copy decoding ---
299
300/// Decode a single frame without allocating owned data for field values.
301pub(crate) fn decode_frame_ref<'a, 's>(
302    data: &'a [u8],
303    schema_lookup: impl Fn(WireTypeId) -> Option<SchemaInfo<'s>>,
304    timestamp_base_ns: u64,
305) -> Option<(FrameRef<'a>, usize)> {
306    let tag = *data.first()?;
307    match tag {
308        TAG_SCHEMA => {
309            let (frame, consumed) = decode_schema_frame(data)?;
310            match frame {
311                Frame::Schema { type_id, entry } => {
312                    Some((FrameRef::Schema { type_id, entry }, consumed))
313                }
314                _ => unreachable!(),
315            }
316        }
317        TAG_EVENT => decode_event_frame_ref(data, schema_lookup, timestamp_base_ns),
318        TAG_STRING_POOL => decode_string_pool_frame_ref(data),
319        TAG_TIMESTAMP_RESET => {
320            let ts = u64::from_le_bytes(data.get(1..9)?.try_into().ok()?);
321            Some((FrameRef::TimestampReset(ts), 9))
322        }
323        _ => None,
324    }
325}
326
327fn decode_event_frame_ref<'a, 's>(
328    data: &'a [u8],
329    schema_lookup: impl Fn(WireTypeId) -> Option<SchemaInfo<'s>>,
330    timestamp_base_ns: u64,
331) -> Option<(FrameRef<'a>, usize)> {
332    let mut pos = 1;
333    let type_id = WireTypeId(u16::from_le_bytes(data.get(pos..pos + 2)?.try_into().ok()?));
334    pos += 2;
335    let info = schema_lookup(type_id)?;
336
337    let timestamp_ns = if info.has_timestamp {
338        let delta = decode_u24_le(&data[pos..])?;
339        pos += 3;
340        Some(timestamp_base_ns.checked_add(delta as u64)?)
341    } else {
342        None
343    };
344
345    let mut values = Vec::with_capacity(info.field_tags.len());
346    for &tag in info.field_tags {
347        let ft = FieldType::from_tag(tag)?;
348        if ft.is_optional() {
349            let prefix = *data.get(pos)?;
350            pos += 1;
351            if prefix == 0x00 {
352                values.push(FieldValueRef::None);
353            } else {
354                let (val, consumed) = FieldValueRef::decode(ft.inner(), data, pos)?;
355                values.push(val);
356                pos += consumed;
357            }
358        } else {
359            let (val, consumed) = FieldValueRef::decode(ft, data, pos)?;
360            values.push(val);
361            pos += consumed;
362        }
363    }
364    Some((
365        FrameRef::Event {
366            type_id,
367            timestamp_ns,
368            values,
369        },
370        pos,
371    ))
372}
373
374fn decode_string_pool_frame_ref<'a>(data: &'a [u8]) -> Option<(FrameRef<'a>, usize)> {
375    let mut pos = 1;
376    let count = u32::from_le_bytes(data.get(pos..pos + 4)?.try_into().ok()?) as usize;
377    pos += 4;
378    let mut entries = Vec::with_capacity(count.min((data.len() - pos) / 8));
379    for _ in 0..count {
380        let pool_id = u32::from_le_bytes(data.get(pos..pos + 4)?.try_into().ok()?);
381        pos += 4;
382        let len = u32::from_le_bytes(data.get(pos..pos + 4)?.try_into().ok()?) as usize;
383        pos += 4;
384        let d = data.get(pos..pos + len)?;
385        pos += len;
386        entries.push(PoolEntryRef { pool_id, data: d });
387    }
388    Some((FrameRef::StringPool(entries), pos))
389}
390
391#[cfg(test)]
392mod tests {
393    use super::*;
394
395    // --- Header tests ---
396
397    #[test]
398    fn header_encode_decode() {
399        let mut buf = Vec::new();
400        encode_header(&mut buf).unwrap();
401        assert_eq!(buf, [0x54, 0x52, 0x43, 0x00, 1]);
402        assert_eq!(decode_header(&buf), Some(1));
403    }
404
405    #[test]
406    fn header_bad_magic() {
407        assert_eq!(decode_header(&[0x00, 0x00, 0x00, 0x00, 1]), None);
408    }
409
410    #[test]
411    fn header_too_short() {
412        assert_eq!(decode_header(&[0x54, 0x52]), None);
413    }
414
415    // --- Schema frame tests ---
416
417    #[test]
418    fn schema_frame_round_trip() {
419        let type_id = WireTypeId(1);
420        let entry = SchemaEntry {
421            name: "PollStart".into(),
422            has_timestamp: true,
423            fields: vec![FieldDef {
424                name: "worker".into(),
425                field_type: FieldType::Varint,
426            }],
427        };
428        let mut buf = Vec::new();
429        encode_schema(type_id, &entry, &mut buf).unwrap();
430        assert_eq!(buf[0], TAG_SCHEMA);
431        let (frame, consumed) = decode_frame(&buf, |_| None, 0).unwrap();
432        assert_eq!(consumed, buf.len());
433        assert_eq!(frame, Frame::Schema { type_id, entry });
434    }
435
436    #[test]
437    fn schema_frame_empty_fields() {
438        let type_id = WireTypeId(0);
439        let entry = SchemaEntry {
440            name: "Empty".into(),
441            has_timestamp: false,
442            fields: vec![],
443        };
444        let mut buf = Vec::new();
445        encode_schema(type_id, &entry, &mut buf).unwrap();
446        let (frame, _) = decode_frame(&buf, |_| None, 0).unwrap();
447        assert_eq!(frame, Frame::Schema { type_id, entry });
448    }
449
450    // --- Event frame tests ---
451
452    #[test]
453    fn event_frame_round_trip() {
454        let values = vec![
455            FieldValue::Varint(12345),
456            FieldValue::Bool(true),
457            FieldValue::String("hi".to_string()),
458        ];
459        let mut buf = Vec::new();
460        encode_event(WireTypeId(1), None, &values, &mut buf).unwrap();
461        assert_eq!(buf[0], TAG_EVENT);
462
463        let tags: Vec<u8> = vec![
464            FieldType::Varint as u8,
465            FieldType::Bool as u8,
466            FieldType::String as u8,
467        ];
468        let lookup = |id: WireTypeId| -> Option<SchemaInfo<'_>> {
469            if id == WireTypeId(1) {
470                Some(SchemaInfo {
471                    field_tags: &tags,
472                    has_timestamp: false,
473                })
474            } else {
475                None
476            }
477        };
478        let (frame, consumed) = decode_frame(&buf, lookup, 0).unwrap();
479        assert_eq!(consumed, buf.len());
480        assert_eq!(
481            frame,
482            Frame::Event {
483                type_id: WireTypeId(1),
484                timestamp_ns: None,
485                values
486            }
487        );
488    }
489
490    #[test]
491    fn event_frame_with_timestamp_round_trip() {
492        let values = vec![FieldValue::Varint(42)];
493        let mut buf = Vec::new();
494        encode_event(WireTypeId(1), Some(1_000_000), &values, &mut buf).unwrap();
495
496        let tags: Vec<u8> = vec![FieldType::Varint as u8];
497        let lookup = |id: WireTypeId| -> Option<SchemaInfo<'_>> {
498            if id == WireTypeId(1) {
499                Some(SchemaInfo {
500                    field_tags: &tags,
501                    has_timestamp: true,
502                })
503            } else {
504                None
505            }
506        };
507        let (frame, consumed) = decode_frame(&buf, lookup, 5_000_000).unwrap();
508        assert_eq!(consumed, buf.len());
509        assert_eq!(
510            frame,
511            Frame::Event {
512                type_id: WireTypeId(1),
513                timestamp_ns: Some(5_000_000 + 1_000_000),
514                values,
515            }
516        );
517    }
518
519    #[test]
520    fn event_frame_unknown_type_id() {
521        let mut buf = Vec::new();
522        encode_event(WireTypeId(99), None, &[FieldValue::Varint(1)], &mut buf).unwrap();
523        assert!(decode_frame(&buf, |_| None, 0).is_none());
524    }
525
526    #[test]
527    fn event_frame_varint_compact() {
528        let values = vec![FieldValue::Varint(1_050_000), FieldValue::Varint(3)];
529        let mut buf = Vec::new();
530        encode_event(WireTypeId(2), None, &values, &mut buf).unwrap();
531        assert!(
532            buf.len() <= 7,
533            "varint PollEnd should be <=7 bytes, got {}",
534            buf.len()
535        );
536
537        let tags: Vec<u8> = vec![FieldType::Varint as u8, FieldType::Varint as u8];
538        let lookup = |id: WireTypeId| -> Option<SchemaInfo<'_>> {
539            if id == WireTypeId(2) {
540                Some(SchemaInfo {
541                    field_tags: &tags,
542                    has_timestamp: false,
543                })
544            } else {
545                None
546            }
547        };
548        let (frame, consumed) = decode_frame(&buf, lookup, 0).unwrap();
549        assert_eq!(consumed, buf.len());
550        assert_eq!(
551            frame,
552            Frame::Event {
553                type_id: WireTypeId(2),
554                timestamp_ns: None,
555                values
556            }
557        );
558    }
559
560    // --- String pool frame tests ---
561
562    #[test]
563    fn string_pool_round_trip() {
564        let entries = vec![
565            PoolEntry {
566                pool_id: 0,
567                data: b"main_thread".to_vec(),
568            },
569            PoolEntry {
570                pool_id: 1,
571                data: b"worker-1".to_vec(),
572            },
573        ];
574        let mut buf = Vec::new();
575        encode_string_pool(&entries, &mut buf).unwrap();
576        assert_eq!(buf[0], TAG_STRING_POOL);
577        let (frame, consumed) = decode_frame(&buf, |_| None, 0).unwrap();
578        assert_eq!(consumed, buf.len());
579        assert_eq!(frame, Frame::StringPool(entries));
580    }
581
582    #[test]
583    fn string_pool_empty() {
584        let mut buf = Vec::new();
585        encode_string_pool(&[], &mut buf).unwrap();
586        let (frame, _) = decode_frame(&buf, |_| None, 0).unwrap();
587        assert_eq!(frame, Frame::StringPool(vec![]));
588    }
589
590    #[test]
591    fn unknown_tag_returns_none() {
592        assert!(decode_frame(&[0xFF], |_| None, 0).is_none());
593    }
594
595    #[test]
596    fn truncated_event_frame() {
597        let tags: Vec<u8> = vec![FieldType::Varint as u8];
598        let data = [TAG_EVENT, 0x01];
599        let result = decode_frame(
600            &data,
601            |_| {
602                Some(SchemaInfo {
603                    field_tags: &tags,
604                    has_timestamp: false,
605                })
606            },
607            0,
608        );
609        assert!(result.is_none());
610    }
611
612    #[test]
613    fn truncated_schema_frame() {
614        let data = [TAG_SCHEMA, 0x00, 0x00];
615        let result = decode_frame(&data, |_: WireTypeId| None, 0);
616        assert!(result.is_none());
617    }
618}