Skip to main content

dial9_trace_format/
decoder.rs

1// Streaming decoder
2
3use crate::codec::{
4    self, Frame, FrameRef, HEADER_SIZE, PoolEntry, PoolEntryRef, SchemaInfo, WireTypeId,
5};
6use crate::schema::{SchemaEntry, SchemaRegistry};
7use crate::types::{FieldType, FieldValueRef, InternedString};
8use std::collections::HashMap;
9use std::fmt;
10
11/// Error returned when the decoder cannot continue reading the stream.
12/// Because frames are not length-prefixed, a decode error is unrecoverable —
13/// the decoder cannot skip the malformed frame to find the next one.
14#[derive(Debug, Clone)]
15pub struct DecodeError {
16    pub pos: usize,
17    pub message: String,
18}
19
20impl fmt::Display for DecodeError {
21    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
22        write!(f, "decode error at byte {}: {}", self.pos, self.message)
23    }
24}
25
26impl std::error::Error for DecodeError {}
27
28/// A decoded event passed to [`Decoder::for_each_event`].
29///
30/// `'a` is the lifetime of the input data buffer (strings, stack frames borrow from it).
31/// `'f` is the lifetime of the `fields` slice and schema name (reused across calls).
32#[non_exhaustive]
33pub struct RawEvent<'a, 'f> {
34    pub type_id: WireTypeId,
35    pub name: &'f str,
36    pub timestamp_ns: Option<u64>,
37    pub fields: &'f [FieldValueRef<'a>],
38    pub string_pool: &'f StringPool,
39}
40
41/// A map from interned string IDs to their resolved string values.
42///
43/// Populated automatically by the [`Decoder`] as it processes `StringPool` frames.
44/// Pass a reference to [`crate::TraceEvent::decode`] so that `InternedString` fields
45/// resolve to `&str` in derived `Ref` types.
46#[derive(Debug, Clone, Default)]
47pub struct StringPool(pub(crate) HashMap<InternedString, String>);
48
49impl StringPool {
50    pub(crate) fn new() -> Self {
51        Self(HashMap::new())
52    }
53
54    pub(crate) fn insert(&mut self, id: InternedString, value: String) {
55        self.0.insert(id, value);
56    }
57
58    pub fn get(&self, id: InternedString) -> Option<&str> {
59        self.0.get(&id).map(|s| s.as_str())
60    }
61
62    pub fn len(&self) -> usize {
63        self.0.len()
64    }
65
66    pub fn is_empty(&self) -> bool {
67        self.0.is_empty()
68    }
69
70    /// Iterate over all interned strings as `(id, value)` pairs.
71    pub fn iter(&self) -> impl Iterator<Item = (InternedString, &str)> {
72        self.0.iter().map(|(&id, v)| (id, v.as_str()))
73    }
74}
75
76/// Decoded events yielded by the decoder.
77#[derive(Debug, Clone, PartialEq)]
78pub enum DecodedFrame {
79    Schema(SchemaEntry),
80    Event {
81        type_id: WireTypeId,
82        /// Absolute timestamp in nanoseconds, if the schema has `has_timestamp`.
83        timestamp_ns: Option<u64>,
84        values: Vec<crate::types::FieldValue>,
85    },
86    StringPool(Vec<PoolEntry>),
87}
88
89/// Zero-copy decoded frame that borrows from the input buffer.
90#[derive(Debug, Clone, PartialEq)]
91pub enum DecodedFrameRef<'a> {
92    Schema(SchemaEntry),
93    Event {
94        type_id: WireTypeId,
95        timestamp_ns: Option<u64>,
96        values: Vec<FieldValueRef<'a>>,
97    },
98    StringPool(Vec<PoolEntryRef<'a>>),
99}
100
101struct SchemaCache {
102    name: String,
103    field_types: Vec<FieldType>,
104    has_timestamp: bool,
105}
106
107pub struct Decoder<'a> {
108    data: &'a [u8],
109    pos: usize,
110    registry: SchemaRegistry,
111    schema_cache: HashMap<WireTypeId, SchemaCache>,
112    string_pool: StringPool,
113    version: u8,
114    timestamp_base_ns: u64,
115}
116
117impl<'a> Decoder<'a> {
118    pub fn new(data: &'a [u8]) -> Option<Self> {
119        let version = codec::decode_header(data)?;
120        Some(Self {
121            data,
122            pos: HEADER_SIZE,
123            registry: SchemaRegistry::new(),
124            schema_cache: HashMap::new(),
125            string_pool: StringPool::new(),
126            version,
127            timestamp_base_ns: 0,
128        })
129    }
130
131    pub fn registry(&self) -> &SchemaRegistry {
132        &self.registry
133    }
134
135    pub fn version(&self) -> u8 {
136        self.version
137    }
138
139    pub fn string_pool(&self) -> &StringPool {
140        &self.string_pool
141    }
142
143    /// Consume this decoder and create an [`Encoder`] that appends to the
144    /// decoded trace. The encoder inherits the string pool, schema registry,
145    /// and timestamp base so new frames are compatible with the existing data.
146    ///
147    /// No file header is written — the caller is responsible for concatenating
148    /// the encoder's output after the original trace bytes.
149    pub fn into_encoder<W: std::io::Write>(self, writer: W) -> crate::encoder::Encoder<W> {
150        crate::encoder::Encoder::from_decoder(
151            self.registry,
152            self.string_pool,
153            self.timestamp_base_ns,
154            writer,
155        )
156    }
157
158    fn schema_info(&self, type_id: WireTypeId) -> Option<SchemaInfo<'_>> {
159        self.schema_cache.get(&type_id).map(|c| SchemaInfo {
160            field_types: &c.field_types,
161            has_timestamp: c.has_timestamp,
162        })
163    }
164
165    fn register_schema(&mut self, type_id: WireTypeId, entry: SchemaEntry) -> Result<(), String> {
166        self.schema_cache.insert(
167            type_id,
168            SchemaCache {
169                name: entry.name.clone(),
170                field_types: entry.fields.iter().map(|f| f.field_type).collect(),
171                has_timestamp: entry.has_timestamp,
172            },
173        );
174        self.registry.register(type_id, entry)
175    }
176
177    /// Decode the next frame. Returns `Ok(None)` when stream is exhausted.
178    /// Returns `Err` if the stream is malformed (e.g. duplicate type_id with
179    /// a different schema).
180    pub fn next_frame(&mut self) -> Result<Option<DecodedFrame>, DecodeError> {
181        if self.pos >= self.data.len() {
182            return Ok(None);
183        }
184        let remaining = &self.data[self.pos..];
185        let base = self.timestamp_base_ns;
186        let (frame, consumed) =
187            match codec::decode_frame(remaining, |type_id| self.schema_info(type_id), base) {
188                Some(r) => r,
189                None => return Ok(None),
190            };
191        self.pos += consumed;
192        match frame {
193            Frame::Schema { type_id, entry } => {
194                let result = DecodedFrame::Schema(entry.clone());
195                self.register_schema(type_id, entry)
196                    .map_err(|msg| DecodeError {
197                        pos: self.pos,
198                        message: msg,
199                    })?;
200                Ok(Some(result))
201            }
202            Frame::Event {
203                type_id,
204                timestamp_ns,
205                values,
206            } => {
207                if let Some(ts) = timestamp_ns {
208                    self.timestamp_base_ns = ts;
209                }
210                Ok(Some(DecodedFrame::Event {
211                    type_id,
212                    timestamp_ns,
213                    values,
214                }))
215            }
216            Frame::StringPool(entries) => {
217                for e in &entries {
218                    if let Ok(s) = String::from_utf8(e.data.clone()) {
219                        self.string_pool.insert(InternedString(e.pool_id), s);
220                    }
221                }
222                Ok(Some(DecodedFrame::StringPool(entries)))
223            }
224            Frame::TimestampReset(ts) => {
225                self.timestamp_base_ns = ts;
226                self.next_frame() // consume silently, return next real frame
227            }
228        }
229    }
230
231    /// Collect all remaining frames. Stops on error or end of stream.
232    pub fn decode_all(&mut self) -> Vec<DecodedFrame> {
233        let mut frames = Vec::new();
234        while let Ok(Some(f)) = self.next_frame() {
235            frames.push(f);
236        }
237        frames
238    }
239
240    /// Decode the next frame without copying field data. Returns `Ok(None)` when
241    /// stream is exhausted. Returns `Err` on malformed data.
242    pub fn next_frame_ref(&mut self) -> Result<Option<DecodedFrameRef<'a>>, DecodeError> {
243        if self.pos >= self.data.len() {
244            return Ok(None);
245        }
246        let remaining = &self.data[self.pos..];
247        let base = self.timestamp_base_ns;
248        let (frame, consumed) =
249            match codec::decode_frame_ref(remaining, |type_id| self.schema_info(type_id), base) {
250                Some(r) => r,
251                None => return Ok(None),
252            };
253        self.pos += consumed;
254        match frame {
255            FrameRef::Schema { type_id, entry } => {
256                let result = DecodedFrameRef::Schema(entry.clone());
257                self.register_schema(type_id, entry)
258                    .map_err(|msg| DecodeError {
259                        pos: self.pos,
260                        message: msg,
261                    })?;
262                Ok(Some(result))
263            }
264            FrameRef::Event {
265                type_id,
266                timestamp_ns,
267                values,
268            } => {
269                if let Some(ts) = timestamp_ns {
270                    self.timestamp_base_ns = ts;
271                }
272                Ok(Some(DecodedFrameRef::Event {
273                    type_id,
274                    timestamp_ns,
275                    values,
276                }))
277            }
278            FrameRef::StringPool(entries) => {
279                for e in &entries {
280                    if let Ok(s) = std::str::from_utf8(e.data) {
281                        self.string_pool
282                            .insert(InternedString(e.pool_id), s.to_string());
283                    }
284                }
285                Ok(Some(DecodedFrameRef::StringPool(entries)))
286            }
287            FrameRef::TimestampReset(ts) => {
288                self.timestamp_base_ns = ts;
289                self.next_frame_ref()
290            }
291        }
292    }
293
294    /// Collect all remaining frames using zero-copy decoding. Stops on error or end of stream.
295    pub fn decode_all_ref(&mut self) -> Vec<DecodedFrameRef<'a>> {
296        let mut frames = Vec::new();
297        while let Ok(Some(f)) = self.next_frame_ref() {
298            frames.push(f);
299        }
300        frames
301    }
302
303    /// Process all events with a callback, avoiding per-event Vec allocations.
304    /// Schemas and string pools are registered automatically.
305    ///
306    /// The [`RawEvent`] passed to the callback borrows from the decoder's input
307    /// buffer. The `fields` slice is reused across calls, so values cannot be
308    /// stored across iterations without copying.
309    ///
310    /// Returns `Err` if the stream is malformed.
311    pub fn for_each_event(
312        &mut self,
313        mut f: impl for<'f> FnMut(RawEvent<'a, 'f>),
314    ) -> Result<(), DecodeError> {
315        let mut values_buf: Vec<FieldValueRef<'a>> = Vec::new();
316        while self.pos < self.data.len() {
317            let remaining = &self.data[self.pos..];
318            let tag = match remaining.first() {
319                Some(t) => *t,
320                None => break,
321            };
322            match tag {
323                codec::TAG_EVENT => {
324                    let mut pos = 1;
325                    let type_id = match remaining.get(pos..pos + 2) {
326                        Some(b) => {
327                            pos += 2;
328                            WireTypeId(u16::from_le_bytes(b.try_into().unwrap()))
329                        }
330                        None => {
331                            return Err(DecodeError {
332                                pos: self.pos,
333                                message: "truncated event frame".into(),
334                            });
335                        }
336                    };
337                    let cache = match self.schema_cache.get(&type_id) {
338                        Some(c) => c,
339                        None => {
340                            return Err(DecodeError {
341                                pos: self.pos,
342                                message: format!("unknown type_id {type_id:?}"),
343                            });
344                        }
345                    };
346
347                    let timestamp_ns = if cache.has_timestamp {
348                        match codec::decode_u24_le(&remaining[pos..]) {
349                            Some(delta) => {
350                                pos += 3;
351                                Some(self.timestamp_base_ns + delta as u64)
352                            }
353                            None => {
354                                return Err(DecodeError {
355                                    pos: self.pos + pos,
356                                    message: "truncated timestamp delta".into(),
357                                });
358                            }
359                        }
360                    } else {
361                        None
362                    };
363
364                    values_buf.clear();
365                    for ft in &cache.field_types {
366                        match FieldValueRef::decode(*ft, remaining, pos) {
367                            Some((val, consumed)) => {
368                                values_buf.push(val);
369                                pos += consumed;
370                            }
371                            None => {
372                                return Err(DecodeError {
373                                    pos: self.pos + pos,
374                                    message: "truncated field value".into(),
375                                });
376                            }
377                        }
378                    }
379                    self.pos += pos;
380                    if let Some(ts) = timestamp_ns {
381                        self.timestamp_base_ns = ts;
382                    }
383                    f(RawEvent {
384                        type_id,
385                        name: &cache.name,
386                        timestamp_ns,
387                        fields: &values_buf,
388                        string_pool: &self.string_pool,
389                    });
390                }
391                codec::TAG_TIMESTAMP_RESET => {
392                    // Handle timestamp resets inline to avoid next_frame_ref's
393                    // recursive consumption of the following frame.
394                    let ts = match self.data.get(self.pos + 1..self.pos + 9) {
395                        Some(b) => u64::from_le_bytes(b.try_into().unwrap()),
396                        None => {
397                            return Err(DecodeError {
398                                pos: self.pos,
399                                message: "truncated timestamp reset".into(),
400                            });
401                        }
402                    };
403                    self.timestamp_base_ns = ts;
404                    self.pos += 9;
405                }
406                _ => {
407                    // Use next_frame_ref for non-event frames (schema, pool, symbol table)
408                    // `next_frame_ref` will update the decoder state as we read the frames (e.g. the pooled strings)
409                    match self.next_frame_ref() {
410                        Ok(Some(_)) => {}
411                        Ok(None) => {
412                            return Err(DecodeError {
413                                pos: self.pos,
414                                message: format!("failed to decode frame with tag 0x{tag:02x}"),
415                            });
416                        }
417                        Err(e) => return Err(e),
418                    }
419                }
420            }
421        }
422        Ok(())
423    }
424
425    /// Returns an iterator that yields only [`DecodedFrameRef::Event`] variants,
426    /// silently consuming schema, string-pool, and symbol-table frames
427    /// (while still updating internal decoder state).
428    pub fn events(&mut self) -> EventIter<'_, 'a> {
429        EventIter { decoder: self }
430    }
431}
432
433impl<'a> Iterator for Decoder<'a> {
434    type Item = Result<DecodedFrameRef<'a>, DecodeError>;
435
436    fn next(&mut self) -> Option<Self::Item> {
437        self.next_frame_ref().transpose()
438    }
439}
440
441/// Iterator that yields only [`DecodedFrameRef::Event`] frames,
442/// consuming non-event frames to keep decoder state up to date.
443pub struct EventIter<'d, 'a> {
444    decoder: &'d mut Decoder<'a>,
445}
446
447impl<'d, 'a> Iterator for EventIter<'d, 'a> {
448    type Item = Result<DecodedFrameRef<'a>, DecodeError>;
449
450    fn next(&mut self) -> Option<Self::Item> {
451        loop {
452            match self.decoder.next()? {
453                Ok(frame @ DecodedFrameRef::Event { .. }) => return Some(Ok(frame)),
454                Ok(_) => continue, // schema, string pool, symbol table — skip
455                Err(e) => return Some(Err(e)),
456            }
457        }
458    }
459}
460
461#[cfg(test)]
462mod tests {
463    use super::*;
464    use crate::encoder::Encoder;
465    use crate::schema::FieldDef;
466    use crate::types::{FieldType, FieldValue};
467
468    #[test]
469    fn decode_empty_stream() {
470        let enc = Encoder::new();
471        let data = enc.finish();
472        let mut dec = Decoder::new(&data).unwrap();
473        assert_eq!(dec.version(), 1);
474        assert!(dec.next_frame().unwrap().is_none());
475    }
476
477    #[test]
478    fn decode_schema_frame() {
479        let mut enc = Encoder::new();
480        enc.register_schema(
481            "Ev",
482            vec![FieldDef {
483                name: "v".into(),
484                field_type: FieldType::Varint,
485            }],
486        )
487        .unwrap();
488        let data = enc.finish();
489        let mut dec = Decoder::new(&data).unwrap();
490        let frame = dec.next_frame().unwrap().unwrap();
491        assert!(matches!(frame, DecodedFrame::Schema(s) if s.name == "Ev"));
492    }
493
494    #[test]
495    fn decode_event_after_schema() {
496        let mut enc = Encoder::new();
497        let schema = enc
498            .register_schema(
499                "Ev",
500                vec![FieldDef {
501                    name: "v".into(),
502                    field_type: FieldType::Varint,
503                }],
504            )
505            .unwrap();
506        enc.write_event(
507            &schema,
508            &[FieldValue::Varint(1_000), FieldValue::Varint(42)],
509        )
510        .unwrap();
511        let data = enc.finish();
512
513        let mut dec = Decoder::new(&data).unwrap();
514        let frames = dec.decode_all();
515        assert_eq!(frames.len(), 2);
516        if let DecodedFrame::Event { values, .. } = &frames[1] {
517            assert_eq!(*values, vec![FieldValue::Varint(42)]);
518        } else {
519            panic!("expected event");
520        }
521    }
522
523    #[test]
524    fn decode_string_pool_builds_map() {
525        let mut enc = Encoder::new();
526        let id = enc.intern_string("hello").unwrap();
527        let data = enc.finish();
528
529        let mut dec = Decoder::new(&data).unwrap();
530        dec.decode_all();
531        assert_eq!(dec.string_pool().get(id), Some("hello"));
532    }
533
534    #[test]
535    fn decode_multiple_events() {
536        let mut enc = Encoder::new();
537        let schema = enc
538            .register_schema(
539                "Ev",
540                vec![FieldDef {
541                    name: "v".into(),
542                    field_type: FieldType::Varint,
543                }],
544            )
545            .unwrap();
546        for i in 0..10u64 {
547            enc.write_event(
548                &schema,
549                &[FieldValue::Varint(i * 1000), FieldValue::Varint(i)],
550            )
551            .unwrap();
552        }
553        let data = enc.finish();
554
555        let mut dec = Decoder::new(&data).unwrap();
556        let frames = dec.decode_all();
557        assert_eq!(frames.len(), 11);
558    }
559
560    #[test]
561    fn bad_header_returns_none() {
562        assert!(Decoder::new(&[0x00, 0x00, 0x00, 0x00, 1]).is_none());
563    }
564
565    #[test]
566    fn iterator_yields_all_frames() {
567        let mut enc = Encoder::new();
568        let schema = enc
569            .register_schema(
570                "Ev",
571                vec![FieldDef {
572                    name: "v".into(),
573                    field_type: FieldType::Varint,
574                }],
575            )
576            .unwrap();
577        for i in 0..3u64 {
578            enc.write_event(
579                &schema,
580                &[FieldValue::Varint(i * 1000), FieldValue::Varint(i)],
581            )
582            .unwrap();
583        }
584        let data = enc.finish();
585
586        let dec = Decoder::new(&data).unwrap();
587        let frames: Vec<_> = dec.collect::<Result<Vec<_>, _>>().unwrap();
588        // 1 schema + 3 events
589        assert_eq!(frames.len(), 4);
590        assert!(matches!(frames[0], DecodedFrameRef::Schema(_)));
591        assert!(matches!(frames[1], DecodedFrameRef::Event { .. }));
592    }
593
594    #[test]
595    fn iterator_early_termination() {
596        let mut enc = Encoder::new();
597        let schema = enc
598            .register_schema(
599                "Ev",
600                vec![FieldDef {
601                    name: "v".into(),
602                    field_type: FieldType::Varint,
603                }],
604            )
605            .unwrap();
606        for i in 0..10u64 {
607            enc.write_event(
608                &schema,
609                &[FieldValue::Varint(i * 1000), FieldValue::Varint(i)],
610            )
611            .unwrap();
612        }
613        let data = enc.finish();
614
615        let mut dec = Decoder::new(&data).unwrap();
616        // Take just 2 frames (schema + first event), don't decode the rest
617        let first_two: Vec<_> = dec.by_ref().take(2).collect::<Result<Vec<_>, _>>().unwrap();
618        assert_eq!(first_two.len(), 2);
619        // Decoder should still have remaining data
620        let next = dec.next();
621        assert!(next.is_some());
622    }
623
624    #[test]
625    fn events_iterator_skips_schema() {
626        let mut enc = Encoder::new();
627        let schema = enc
628            .register_schema(
629                "Ev",
630                vec![FieldDef {
631                    name: "v".into(),
632                    field_type: FieldType::Varint,
633                }],
634            )
635            .unwrap();
636        enc.write_event(
637            &schema,
638            &[FieldValue::Varint(1_000), FieldValue::Varint(42)],
639        )
640        .unwrap();
641        enc.write_event(
642            &schema,
643            &[FieldValue::Varint(2_000), FieldValue::Varint(99)],
644        )
645        .unwrap();
646        let data = enc.finish();
647
648        let mut dec = Decoder::new(&data).unwrap();
649        let events: Vec<_> = dec.events().collect::<Result<Vec<_>, _>>().unwrap();
650        // Only events, no schema frame
651        assert_eq!(events.len(), 2);
652        for ev in &events {
653            assert!(matches!(ev, DecodedFrameRef::Event { .. }));
654        }
655    }
656
657    #[test]
658    fn events_iterator_first_event_only() {
659        let mut enc = Encoder::new();
660        let schema = enc
661            .register_schema(
662                "Ev",
663                vec![FieldDef {
664                    name: "v".into(),
665                    field_type: FieldType::Varint,
666                }],
667            )
668            .unwrap();
669        for i in 0..5u64 {
670            enc.write_event(
671                &schema,
672                &[FieldValue::Varint(i * 1000), FieldValue::Varint(i)],
673            )
674            .unwrap();
675        }
676        let data = enc.finish();
677
678        let mut dec = Decoder::new(&data).unwrap();
679        // Get just the first event — schema is consumed internally
680        let first = dec.events().next().unwrap().unwrap();
681        assert!(matches!(first, DecodedFrameRef::Event { .. }));
682    }
683}