Skip to main content

dial9_trace_format/
codec.rs

1// Frame encoding/decoding (header, schema, event, string pool, symbol table)
2
3use crate::schema::{FieldDef, SchemaEntry};
4use crate::types::{FieldType, FieldValue, FieldValueRef};
5use std::io::{self, Write};
6
7/// Type ID as it appears on the wire (u16 in schema/event frame headers).
8/// Assigned sequentially by the encoder; the decoder reads them from the stream.
9///
10/// ## Note
11/// The wire type id is only stable within a single file. It is not static.
12#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
13pub struct WireTypeId(pub u16);
14
15pub const MAGIC: [u8; 4] = [0x54, 0x52, 0x43, 0x00]; // TRC\0
16pub const VERSION: u8 = 1;
17pub const HEADER_SIZE: usize = 5;
18
19pub const TAG_SCHEMA: u8 = 0x01;
20pub const TAG_EVENT: u8 = 0x02;
21pub const TAG_STRING_POOL: u8 = 0x03;
22// Tags 0x04 and 0x06 are reserved (formerly SymbolTable and ProcMaps, now schema-based events).
23pub const TAG_TIMESTAMP_RESET: u8 = 0x05;
24
25/// Maximum nanosecond delta that fits in a u24 (3 bytes).
26pub const MAX_TIMESTAMP_DELTA_NS: u64 = 0xFF_FFFF; // 16,777,215
27
28/// Encode a u32 value as 3-byte little-endian (u24). Caller must ensure `value <= 0xFF_FFFF`.
29#[inline]
30pub fn encode_u24_le(value: u32, w: &mut impl Write) -> io::Result<()> {
31    debug_assert!(value <= MAX_TIMESTAMP_DELTA_NS as u32);
32    w.write_all(&[value as u8, (value >> 8) as u8, (value >> 16) as u8])
33}
34
35/// Decode a 3-byte little-endian u24 from `data`. Returns `None` if fewer than 3 bytes.
36#[inline]
37pub fn decode_u24_le(data: &[u8]) -> Option<u32> {
38    let b = data.get(..3)?;
39    Some(b[0] as u32 | (b[1] as u32) << 8 | (b[2] as u32) << 16)
40}
41
42#[derive(Debug, Clone, PartialEq)]
43pub struct PoolEntry {
44    pub pool_id: u32,
45    pub data: Vec<u8>,
46}
47
48#[derive(Debug, Clone, PartialEq)]
49pub enum Frame {
50    Schema {
51        type_id: WireTypeId,
52        entry: SchemaEntry,
53    },
54    Event {
55        type_id: WireTypeId,
56        /// Absolute timestamp in nanoseconds, if the schema has `has_timestamp`.
57        timestamp_ns: Option<u64>,
58        values: Vec<FieldValue>,
59    },
60    StringPool(Vec<PoolEntry>),
61    TimestampReset(u64),
62}
63
64/// Zero-copy pool entry borrowing from the input buffer.
65#[derive(Debug, Clone, PartialEq)]
66pub struct PoolEntryRef<'a> {
67    pub pool_id: u32,
68    pub data: &'a [u8],
69}
70
71/// Zero-copy frame that borrows from the input buffer.
72#[derive(Debug, Clone, PartialEq)]
73pub enum FrameRef<'a> {
74    Schema {
75        type_id: WireTypeId,
76        entry: SchemaEntry,
77    },
78    Event {
79        type_id: WireTypeId,
80        timestamp_ns: Option<u64>,
81        values: Vec<FieldValueRef<'a>>,
82    },
83    StringPool(Vec<PoolEntryRef<'a>>),
84    TimestampReset(u64),
85}
86
87/// Schema info needed by the decoder: field types + has_timestamp flag.
88pub struct SchemaInfo<'a> {
89    pub field_types: &'a [FieldType],
90    pub has_timestamp: bool,
91}
92
93// --- Encoding ---
94
95pub fn encode_header(w: &mut impl Write) -> io::Result<()> {
96    w.write_all(&MAGIC)?;
97    w.write_all(&[VERSION])
98}
99
100pub fn encode_schema(
101    type_id: WireTypeId,
102    entry: &SchemaEntry,
103    w: &mut impl Write,
104) -> io::Result<()> {
105    w.write_all(&[TAG_SCHEMA])?;
106    w.write_all(&type_id.0.to_le_bytes())?;
107    let name_bytes = entry.name.as_bytes();
108    w.write_all(&(name_bytes.len() as u16).to_le_bytes())?;
109    w.write_all(name_bytes)?;
110    w.write_all(&[if entry.has_timestamp { 1 } else { 0 }])?;
111    w.write_all(&(entry.fields.len() as u16).to_le_bytes())?;
112    for f in &entry.fields {
113        let fname = f.name.as_bytes();
114        w.write_all(&(fname.len() as u16).to_le_bytes())?;
115        w.write_all(fname)?;
116        w.write_all(&[f.field_type as u8])?;
117    }
118    Ok(())
119}
120
121/// Encode an event frame. If `timestamp_delta_ns` is Some, writes a u24 LE delta
122/// after the type_id (for schemas with `has_timestamp = true`).
123pub fn encode_event(
124    type_id: WireTypeId,
125    timestamp_delta_ns: Option<u32>,
126    values: &[FieldValue],
127    w: &mut impl Write,
128) -> io::Result<()> {
129    w.write_all(&[TAG_EVENT])?;
130    w.write_all(&type_id.0.to_le_bytes())?;
131    if let Some(delta) = timestamp_delta_ns {
132        encode_u24_le(delta, w)?;
133    }
134    for v in values {
135        v.encode(w)?;
136    }
137    Ok(())
138}
139
140pub fn encode_string_pool(entries: &[PoolEntry], w: &mut impl Write) -> io::Result<()> {
141    w.write_all(&[TAG_STRING_POOL])?;
142    w.write_all(&(entries.len() as u32).to_le_bytes())?;
143    for e in entries {
144        w.write_all(&e.pool_id.to_le_bytes())?;
145        w.write_all(&(e.data.len() as u32).to_le_bytes())?;
146        w.write_all(&e.data)?;
147    }
148    Ok(())
149}
150
151// --- Decoding ---
152
153pub fn decode_header(data: &[u8]) -> Option<u8> {
154    if data.get(..4)? != MAGIC {
155        return None;
156    }
157    let version = *data.get(4)?;
158    Some(version)
159}
160
161/// Decode a single frame starting at `data`. Returns (Frame, bytes_consumed).
162pub fn decode_frame<'s>(
163    data: &[u8],
164    schema_lookup: impl Fn(WireTypeId) -> Option<SchemaInfo<'s>>,
165    timestamp_base_ns: u64,
166) -> Option<(Frame, usize)> {
167    let tag = *data.first()?;
168    match tag {
169        TAG_SCHEMA => decode_schema_frame(data),
170        TAG_EVENT => decode_event_frame(data, schema_lookup, timestamp_base_ns),
171        TAG_STRING_POOL => decode_string_pool_frame(data),
172        TAG_TIMESTAMP_RESET => {
173            let ts = u64::from_le_bytes(data.get(1..9)?.try_into().ok()?);
174            Some((Frame::TimestampReset(ts), 9))
175        }
176        _ => None,
177    }
178}
179
180fn decode_schema_frame(data: &[u8]) -> Option<(Frame, usize)> {
181    let mut pos = 1; // skip tag
182    let type_id = WireTypeId(u16::from_le_bytes(data.get(pos..pos + 2)?.try_into().ok()?));
183    pos += 2;
184    let name_len = u16::from_le_bytes(data.get(pos..pos + 2)?.try_into().ok()?) as usize;
185    pos += 2;
186    let name = String::from_utf8(data.get(pos..pos + name_len)?.to_vec()).ok()?;
187    pos += name_len;
188    let has_timestamp = *data.get(pos)? != 0;
189    pos += 1;
190    let field_count = u16::from_le_bytes(data.get(pos..pos + 2)?.try_into().ok()?) as usize;
191    pos += 2;
192    let mut fields = Vec::with_capacity(field_count);
193    for _ in 0..field_count {
194        let fname_len = u16::from_le_bytes(data.get(pos..pos + 2)?.try_into().ok()?) as usize;
195        pos += 2;
196        let fname = String::from_utf8(data.get(pos..pos + fname_len)?.to_vec()).ok()?;
197        pos += fname_len;
198        let ft = FieldType::from_tag(*data.get(pos)?)?;
199        pos += 1;
200        fields.push(FieldDef {
201            name: fname,
202            field_type: ft,
203        });
204    }
205    Some((
206        Frame::Schema {
207            type_id,
208            entry: SchemaEntry {
209                name,
210                has_timestamp,
211                fields,
212            },
213        },
214        pos,
215    ))
216}
217
218fn decode_event_frame<'s>(
219    data: &[u8],
220    schema_lookup: impl Fn(WireTypeId) -> Option<SchemaInfo<'s>>,
221    timestamp_base_ns: u64,
222) -> Option<(Frame, usize)> {
223    let mut pos = 1; // skip tag
224    let type_id = WireTypeId(u16::from_le_bytes(data.get(pos..pos + 2)?.try_into().ok()?));
225    pos += 2;
226    let info = schema_lookup(type_id)?;
227
228    let timestamp_ns = if info.has_timestamp {
229        let delta = decode_u24_le(&data[pos..])?;
230        pos += 3;
231        Some(timestamp_base_ns.checked_add(delta as u64)?)
232    } else {
233        None
234    };
235
236    let mut values = Vec::with_capacity(info.field_types.len());
237    let mut remaining = &data[pos..];
238    for ft in info.field_types {
239        let (val, rest) = FieldValue::decode(*ft, remaining)?;
240        values.push(val);
241        remaining = rest;
242    }
243    let consumed = data.len() - remaining.len();
244    Some((
245        Frame::Event {
246            type_id,
247            timestamp_ns,
248            values,
249        },
250        consumed,
251    ))
252}
253
254fn decode_string_pool_frame(data: &[u8]) -> Option<(Frame, usize)> {
255    let mut pos = 1;
256    let count = u32::from_le_bytes(data.get(pos..pos + 4)?.try_into().ok()?) as usize;
257    pos += 4;
258    let mut entries = Vec::with_capacity(count.min((data.len() - pos) / 8));
259    for _ in 0..count {
260        let pool_id = u32::from_le_bytes(data.get(pos..pos + 4)?.try_into().ok()?);
261        pos += 4;
262        let len = u32::from_le_bytes(data.get(pos..pos + 4)?.try_into().ok()?) as usize;
263        pos += 4;
264        let d = data.get(pos..pos + len)?.to_vec();
265        pos += len;
266        entries.push(PoolEntry { pool_id, data: d });
267    }
268    Some((Frame::StringPool(entries), pos))
269}
270
271// --- Zero-copy decoding ---
272
273/// Decode a single frame without allocating owned data for field values.
274pub fn decode_frame_ref<'a, 's>(
275    data: &'a [u8],
276    schema_lookup: impl Fn(WireTypeId) -> Option<SchemaInfo<'s>>,
277    timestamp_base_ns: u64,
278) -> Option<(FrameRef<'a>, usize)> {
279    let tag = *data.first()?;
280    match tag {
281        TAG_SCHEMA => {
282            let (frame, consumed) = decode_schema_frame(data)?;
283            match frame {
284                Frame::Schema { type_id, entry } => {
285                    Some((FrameRef::Schema { type_id, entry }, consumed))
286                }
287                _ => unreachable!(),
288            }
289        }
290        TAG_EVENT => decode_event_frame_ref(data, schema_lookup, timestamp_base_ns),
291        TAG_STRING_POOL => decode_string_pool_frame_ref(data),
292        TAG_TIMESTAMP_RESET => {
293            let ts = u64::from_le_bytes(data.get(1..9)?.try_into().ok()?);
294            Some((FrameRef::TimestampReset(ts), 9))
295        }
296        _ => None,
297    }
298}
299
300fn decode_event_frame_ref<'a, 's>(
301    data: &'a [u8],
302    schema_lookup: impl Fn(WireTypeId) -> Option<SchemaInfo<'s>>,
303    timestamp_base_ns: u64,
304) -> Option<(FrameRef<'a>, usize)> {
305    let mut pos = 1;
306    let type_id = WireTypeId(u16::from_le_bytes(data.get(pos..pos + 2)?.try_into().ok()?));
307    pos += 2;
308    let info = schema_lookup(type_id)?;
309
310    let timestamp_ns = if info.has_timestamp {
311        let delta = decode_u24_le(&data[pos..])?;
312        pos += 3;
313        Some(timestamp_base_ns.checked_add(delta as u64)?)
314    } else {
315        None
316    };
317
318    let mut values = Vec::with_capacity(info.field_types.len());
319    for ft in info.field_types {
320        let (val, consumed) = FieldValueRef::decode(*ft, data, pos)?;
321        values.push(val);
322        pos += consumed;
323    }
324    Some((
325        FrameRef::Event {
326            type_id,
327            timestamp_ns,
328            values,
329        },
330        pos,
331    ))
332}
333
334fn decode_string_pool_frame_ref<'a>(data: &'a [u8]) -> Option<(FrameRef<'a>, usize)> {
335    let mut pos = 1;
336    let count = u32::from_le_bytes(data.get(pos..pos + 4)?.try_into().ok()?) as usize;
337    pos += 4;
338    let mut entries = Vec::with_capacity(count.min((data.len() - pos) / 8));
339    for _ in 0..count {
340        let pool_id = u32::from_le_bytes(data.get(pos..pos + 4)?.try_into().ok()?);
341        pos += 4;
342        let len = u32::from_le_bytes(data.get(pos..pos + 4)?.try_into().ok()?) as usize;
343        pos += 4;
344        let d = data.get(pos..pos + len)?;
345        pos += len;
346        entries.push(PoolEntryRef { pool_id, data: d });
347    }
348    Some((FrameRef::StringPool(entries), pos))
349}
350
351#[cfg(test)]
352mod tests {
353    use super::*;
354
355    // --- Header tests ---
356
357    #[test]
358    fn header_encode_decode() {
359        let mut buf = Vec::new();
360        encode_header(&mut buf).unwrap();
361        assert_eq!(buf, [0x54, 0x52, 0x43, 0x00, 1]);
362        assert_eq!(decode_header(&buf), Some(1));
363    }
364
365    #[test]
366    fn header_bad_magic() {
367        assert_eq!(decode_header(&[0x00, 0x00, 0x00, 0x00, 1]), None);
368    }
369
370    #[test]
371    fn header_too_short() {
372        assert_eq!(decode_header(&[0x54, 0x52]), None);
373    }
374
375    // --- Schema frame tests ---
376
377    #[test]
378    fn schema_frame_round_trip() {
379        let type_id = WireTypeId(1);
380        let entry = SchemaEntry {
381            name: "PollStart".into(),
382            has_timestamp: true,
383            fields: vec![FieldDef {
384                name: "worker".into(),
385                field_type: FieldType::Varint,
386            }],
387        };
388        let mut buf = Vec::new();
389        encode_schema(type_id, &entry, &mut buf).unwrap();
390        assert_eq!(buf[0], TAG_SCHEMA);
391        let (frame, consumed) = decode_frame(&buf, |_| None, 0).unwrap();
392        assert_eq!(consumed, buf.len());
393        assert_eq!(frame, Frame::Schema { type_id, entry });
394    }
395
396    #[test]
397    fn schema_frame_empty_fields() {
398        let type_id = WireTypeId(0);
399        let entry = SchemaEntry {
400            name: "Empty".into(),
401            has_timestamp: false,
402            fields: vec![],
403        };
404        let mut buf = Vec::new();
405        encode_schema(type_id, &entry, &mut buf).unwrap();
406        let (frame, _) = decode_frame(&buf, |_| None, 0).unwrap();
407        assert_eq!(frame, Frame::Schema { type_id, entry });
408    }
409
410    // --- Event frame tests ---
411
412    #[test]
413    fn event_frame_round_trip() {
414        let values = vec![
415            FieldValue::Varint(12345),
416            FieldValue::Bool(true),
417            FieldValue::String("hi".to_string()),
418        ];
419        let mut buf = Vec::new();
420        encode_event(WireTypeId(1), None, &values, &mut buf).unwrap();
421        assert_eq!(buf[0], TAG_EVENT);
422
423        let types = vec![FieldType::Varint, FieldType::Bool, FieldType::String];
424        let lookup = |id: WireTypeId| -> Option<SchemaInfo<'_>> {
425            if id == WireTypeId(1) {
426                Some(SchemaInfo {
427                    field_types: &types,
428                    has_timestamp: false,
429                })
430            } else {
431                None
432            }
433        };
434        let (frame, consumed) = decode_frame(&buf, lookup, 0).unwrap();
435        assert_eq!(consumed, buf.len());
436        assert_eq!(
437            frame,
438            Frame::Event {
439                type_id: WireTypeId(1),
440                timestamp_ns: None,
441                values
442            }
443        );
444    }
445
446    #[test]
447    fn event_frame_with_timestamp_round_trip() {
448        let values = vec![FieldValue::Varint(42)];
449        let mut buf = Vec::new();
450        encode_event(WireTypeId(1), Some(1_000_000), &values, &mut buf).unwrap();
451
452        let types = vec![FieldType::Varint];
453        let lookup = |id: WireTypeId| -> Option<SchemaInfo<'_>> {
454            if id == WireTypeId(1) {
455                Some(SchemaInfo {
456                    field_types: &types,
457                    has_timestamp: true,
458                })
459            } else {
460                None
461            }
462        };
463        let (frame, consumed) = decode_frame(&buf, lookup, 5_000_000).unwrap();
464        assert_eq!(consumed, buf.len());
465        assert_eq!(
466            frame,
467            Frame::Event {
468                type_id: WireTypeId(1),
469                timestamp_ns: Some(5_000_000 + 1_000_000),
470                values,
471            }
472        );
473    }
474
475    #[test]
476    fn event_frame_unknown_type_id() {
477        let mut buf = Vec::new();
478        encode_event(WireTypeId(99), None, &[FieldValue::Varint(1)], &mut buf).unwrap();
479        assert!(decode_frame(&buf, |_| None, 0).is_none());
480    }
481
482    #[test]
483    fn event_frame_varint_compact() {
484        let values = vec![FieldValue::Varint(1_050_000), FieldValue::Varint(3)];
485        let mut buf = Vec::new();
486        encode_event(WireTypeId(2), None, &values, &mut buf).unwrap();
487        assert!(
488            buf.len() <= 7,
489            "varint PollEnd should be <=7 bytes, got {}",
490            buf.len()
491        );
492
493        let types = vec![FieldType::Varint, FieldType::Varint];
494        let lookup = |id: WireTypeId| -> Option<SchemaInfo<'_>> {
495            if id == WireTypeId(2) {
496                Some(SchemaInfo {
497                    field_types: &types,
498                    has_timestamp: false,
499                })
500            } else {
501                None
502            }
503        };
504        let (frame, consumed) = decode_frame(&buf, lookup, 0).unwrap();
505        assert_eq!(consumed, buf.len());
506        assert_eq!(
507            frame,
508            Frame::Event {
509                type_id: WireTypeId(2),
510                timestamp_ns: None,
511                values
512            }
513        );
514    }
515
516    // --- String pool frame tests ---
517
518    #[test]
519    fn string_pool_round_trip() {
520        let entries = vec![
521            PoolEntry {
522                pool_id: 0,
523                data: b"main_thread".to_vec(),
524            },
525            PoolEntry {
526                pool_id: 1,
527                data: b"worker-1".to_vec(),
528            },
529        ];
530        let mut buf = Vec::new();
531        encode_string_pool(&entries, &mut buf).unwrap();
532        assert_eq!(buf[0], TAG_STRING_POOL);
533        let (frame, consumed) = decode_frame(&buf, |_| None, 0).unwrap();
534        assert_eq!(consumed, buf.len());
535        assert_eq!(frame, Frame::StringPool(entries));
536    }
537
538    #[test]
539    fn string_pool_empty() {
540        let mut buf = Vec::new();
541        encode_string_pool(&[], &mut buf).unwrap();
542        let (frame, _) = decode_frame(&buf, |_| None, 0).unwrap();
543        assert_eq!(frame, Frame::StringPool(vec![]));
544    }
545
546    #[test]
547    fn unknown_tag_returns_none() {
548        assert!(decode_frame(&[0xFF], |_| None, 0).is_none());
549    }
550}