Skip to main content

dial9_trace_format/
decoder.rs

1//! Streaming decoder for reading trace files.
2//!
3//! [`Decoder`] reads the file header, processes schema and string-pool frames,
4//! and yields events as [`DecodedFrame`] (owned) or [`DecodedFrameRef`]
5//! (zero-copy). It also implements [`Iterator`] and provides a
6//! [`for_each_event`](Decoder::for_each_event) callback API for
7//! allocation-free processing.
8
9use crate::codec::{
10    self, Frame, FrameRef, HEADER_SIZE, PoolEntry, PoolEntryRef, SchemaInfo, StackPoolEntry,
11    StackPoolEntryRef, WireTypeId,
12};
13use crate::schema::{SchemaEntry, SchemaRegistry};
14use crate::types::{FieldType, FieldValueRef, InternedStackFrames, InternedString, StackFrames};
15use std::collections::HashMap;
16use std::fmt;
17
18/// Error returned when the decoder cannot continue reading the stream.
19/// Because frames are not length-prefixed, a decode error is unrecoverable —
20/// the decoder cannot skip the malformed frame to find the next one.
21#[derive(Debug, Clone)]
22pub struct DecodeError {
23    pub pos: usize,
24    pub message: String,
25}
26
27impl fmt::Display for DecodeError {
28    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
29        write!(f, "decode error at byte {}: {}", self.pos, self.message)
30    }
31}
32
33impl std::error::Error for DecodeError {}
34
35/// Error returned by [`Decoder::try_for_each_event`].
36#[derive(Debug)]
37pub enum TryForEachError<E> {
38    Decode(DecodeError),
39    User(E),
40}
41
42impl<E: fmt::Display> fmt::Display for TryForEachError<E> {
43    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
44        match self {
45            TryForEachError::Decode(e) => write!(f, "{e}"),
46            TryForEachError::User(e) => write!(f, "{e}"),
47        }
48    }
49}
50
51impl<E: fmt::Display + fmt::Debug> std::error::Error for TryForEachError<E> {}
52
53/// A decoded event passed to [`Decoder::for_each_event`].
54///
55/// `'a` is the lifetime of the input data buffer (strings, stack frames borrow from it).
56/// `'f` is the lifetime of the `fields` slice and schema name (reused across calls).
57#[non_exhaustive]
58pub struct RawEvent<'a, 'f> {
59    pub type_id: WireTypeId,
60    pub name: &'f str,
61    pub timestamp_ns: Option<u64>,
62    pub fields: &'f [FieldValueRef<'a>],
63    pub schema: &'f SchemaEntry,
64    pub string_pool: &'f StringPool,
65    pub stack_pool: &'f StackPool,
66}
67
68impl<'a, 'f> RawEvent<'a, 'f> {
69    /// Field names from the schema, parallel to `fields`.
70    pub fn field_names(&self) -> impl Iterator<Item = &'f str> {
71        self.schema.fields.iter().map(|f| f.name.as_str())
72    }
73}
74
75/// A map from interned string IDs to their resolved string values.
76///
77/// Populated automatically by the [`Decoder`] as it processes `StringPool` frames.
78/// Pass a reference to [`crate::TraceEvent::decode`] so that `InternedString` fields
79/// resolve to `&str` in derived `Ref` types.
80#[derive(Debug, Default)]
81pub struct StringPool(pub(crate) HashMap<InternedString, String>);
82
83impl StringPool {
84    pub(crate) fn new() -> Self {
85        Self(HashMap::default())
86    }
87
88    pub(crate) fn insert(&mut self, id: InternedString, value: String) {
89        self.0.insert(id, value);
90    }
91
92    pub fn get(&self, id: InternedString) -> Option<&str> {
93        self.0.get(&id).map(|s| s.as_str())
94    }
95
96    pub fn len(&self) -> usize {
97        self.0.len()
98    }
99
100    pub fn is_empty(&self) -> bool {
101        self.0.is_empty()
102    }
103
104    /// Iterate over all interned strings as `(id, value)` pairs.
105    pub fn iter(&self) -> impl Iterator<Item = (InternedString, &str)> {
106        self.0.iter().map(|(&id, v)| (id, v.as_str()))
107    }
108}
109
110/// A map from interned stack-frame IDs to their resolved address vectors.
111///
112/// Populated automatically by the [`Decoder`] as it processes `StackPool` frames.
113#[derive(Debug, Default)]
114pub struct StackPool(pub(crate) HashMap<InternedStackFrames, Vec<u64>>);
115
116impl StackPool {
117    pub(crate) fn new() -> Self {
118        Self(HashMap::default())
119    }
120
121    pub(crate) fn insert(&mut self, id: InternedStackFrames, frames: StackFrames) {
122        self.0.insert(id, frames.0);
123    }
124
125    pub fn get(&self, id: InternedStackFrames) -> Option<&[u64]> {
126        self.0.get(&id).map(|v| v.as_slice())
127    }
128
129    pub fn len(&self) -> usize {
130        self.0.len()
131    }
132
133    pub fn is_empty(&self) -> bool {
134        self.0.is_empty()
135    }
136
137    /// Iterate over all interned stack frames as `(id, frames)` pairs.
138    pub fn iter(&self) -> impl Iterator<Item = (InternedStackFrames, &[u64])> {
139        self.0.iter().map(|(&id, v)| (id, v.as_slice()))
140    }
141}
142
143/// Decoded events yielded by the decoder.
144#[derive(Debug, Clone, PartialEq)]
145pub enum DecodedFrame {
146    Schema(SchemaEntry),
147    Event {
148        type_id: WireTypeId,
149        /// Absolute timestamp in nanoseconds, if the schema has `has_timestamp`.
150        timestamp_ns: Option<u64>,
151        values: Vec<crate::types::FieldValue>,
152    },
153    StringPool(Vec<PoolEntry>),
154    StackPool(Vec<StackPoolEntry>),
155}
156
157/// Zero-copy decoded frame that borrows from the input buffer.
158#[derive(Debug, Clone, PartialEq)]
159pub enum DecodedFrameRef<'a> {
160    Schema(SchemaEntry),
161    Event {
162        type_id: WireTypeId,
163        timestamp_ns: Option<u64>,
164        values: Vec<FieldValueRef<'a>>,
165    },
166    StringPool(Vec<PoolEntryRef<'a>>),
167    StackPool(Vec<StackPoolEntryRef<'a>>),
168}
169
170struct SchemaCache {
171    entry: SchemaEntry,
172    /// Raw field type tags for fast decode (avoids re-extracting from entry.fields).
173    field_tags: Vec<u8>,
174}
175
176/// Streaming trace file decoder.
177///
178/// Reads from a byte slice, processing schema, string-pool, and event frames.
179/// Implements [`Iterator`] over [`DecodedFrameRef`] for convenient consumption.
180pub struct Decoder<'a> {
181    data: &'a [u8],
182    pos: usize,
183    registry: SchemaRegistry,
184    schema_cache: Vec<Option<SchemaCache>>,
185    string_pool: StringPool,
186    stack_pool: StackPool,
187    version: u8,
188    timestamp_base_ns: u64,
189}
190
191impl<'a> Decoder<'a> {
192    pub fn new(data: &'a [u8]) -> Option<Self> {
193        let version = codec::decode_header(data)?;
194        Some(Self {
195            data,
196            pos: HEADER_SIZE,
197            registry: SchemaRegistry::new(),
198            schema_cache: Vec::new(),
199            string_pool: StringPool::new(),
200            stack_pool: StackPool::new(),
201            version,
202            timestamp_base_ns: 0,
203        })
204    }
205
206    pub fn registry(&self) -> &SchemaRegistry {
207        &self.registry
208    }
209
210    pub fn version(&self) -> u8 {
211        self.version
212    }
213
214    pub fn string_pool(&self) -> &StringPool {
215        &self.string_pool
216    }
217
218    pub fn stack_pool(&self) -> &StackPool {
219        &self.stack_pool
220    }
221
222    /// Reset decoder state (schemas, string pool, timestamp base) as if
223    /// starting a fresh stream. Used when a mid-stream header is encountered
224    /// (the "reset frame" pattern for concatenated thread-local batches).
225    fn reset_state(&mut self) {
226        self.registry = SchemaRegistry::new();
227        self.schema_cache.clear();
228        self.string_pool = StringPool::new();
229        self.stack_pool = StackPool::new();
230        self.timestamp_base_ns = 0;
231    }
232
233    /// If the current position starts with a valid header, reset state and
234    /// skip past it, returning true.
235    fn try_consume_reset_header(&mut self) -> bool {
236        if self.pos + HEADER_SIZE <= self.data.len()
237            && codec::decode_header(&self.data[self.pos..]).is_some()
238        {
239            self.reset_state();
240            self.pos += HEADER_SIZE;
241            true
242        } else {
243            false
244        }
245    }
246
247    /// Consume this decoder and create an [`Encoder`](crate::encoder::Encoder) that appends to the
248    /// decoded trace. The encoder inherits the string pool, schema registry,
249    /// and timestamp base so new frames are compatible with the existing data.
250    ///
251    /// No file header is written — the caller is responsible for concatenating
252    /// the encoder's output after the original trace bytes.
253    pub fn into_encoder<W: std::io::Write>(self, writer: W) -> crate::encoder::Encoder<W> {
254        crate::encoder::Encoder::from_decoder(
255            self.registry,
256            self.string_pool,
257            self.stack_pool,
258            self.timestamp_base_ns,
259            writer,
260        )
261    }
262
263    pub(crate) fn schema_info(&self, type_id: WireTypeId) -> Option<SchemaInfo<'_>> {
264        self.schema_cache
265            .get(type_id.0 as usize)
266            .and_then(|s| s.as_ref())
267            .map(|c| SchemaInfo {
268                field_tags: &c.field_tags,
269                has_timestamp: c.entry.has_timestamp,
270            })
271    }
272
273    fn register_schema(&mut self, type_id: WireTypeId, entry: SchemaEntry) -> Result<(), String> {
274        let idx = type_id.0 as usize;
275        if idx >= self.schema_cache.len() {
276            self.schema_cache.resize_with(idx + 1, || None);
277        }
278        self.schema_cache[idx] = Some(SchemaCache {
279            field_tags: entry.fields.iter().map(|f| f.field_type as u8).collect(),
280            entry: entry.clone(),
281        });
282        self.registry.register(type_id, entry)
283    }
284
285    /// Decode the next frame. Returns `Ok(None)` when stream is exhausted.
286    /// Returns `Err` if the stream is malformed (e.g. duplicate type_id with
287    /// a different schema).
288    pub fn next_frame(&mut self) -> Result<Option<DecodedFrame>, DecodeError> {
289        if self.pos >= self.data.len() {
290            return Ok(None);
291        }
292        if self.try_consume_reset_header() {
293            return self.next_frame();
294        }
295        let remaining = &self.data[self.pos..];
296        let base = self.timestamp_base_ns;
297        let (frame, consumed) =
298            match codec::decode_frame(remaining, |type_id| self.schema_info(type_id), base) {
299                Some(r) => r,
300                None => return Ok(None),
301            };
302        self.pos += consumed;
303        match frame {
304            Frame::Schema { type_id, entry } => {
305                let result = DecodedFrame::Schema(entry.clone());
306                self.register_schema(type_id, entry)
307                    .map_err(|msg| DecodeError {
308                        pos: self.pos,
309                        message: msg,
310                    })?;
311                Ok(Some(result))
312            }
313            Frame::Event {
314                type_id,
315                timestamp_ns,
316                values,
317            } => {
318                if let Some(ts) = timestamp_ns {
319                    self.timestamp_base_ns = ts;
320                }
321                Ok(Some(DecodedFrame::Event {
322                    type_id,
323                    timestamp_ns,
324                    values,
325                }))
326            }
327            Frame::StringPool(entries) => {
328                for e in &entries {
329                    if let Ok(s) = String::from_utf8(e.data.clone()) {
330                        self.string_pool.insert(InternedString(e.pool_id), s);
331                    }
332                }
333                Ok(Some(DecodedFrame::StringPool(entries)))
334            }
335            Frame::StackPool(entries) => {
336                for e in &entries {
337                    self.stack_pool
338                        .insert(InternedStackFrames(e.pool_id), e.frames.clone().into());
339                }
340                Ok(Some(DecodedFrame::StackPool(entries)))
341            }
342            Frame::TimestampReset(ts) => {
343                self.timestamp_base_ns = ts;
344                self.next_frame() // consume silently, return next real frame
345            }
346        }
347    }
348
349    /// Collect all remaining frames. Stops on error or end of stream.
350    pub fn decode_all(&mut self) -> Vec<DecodedFrame> {
351        let mut frames = Vec::new();
352        while let Ok(Some(f)) = self.next_frame() {
353            frames.push(f);
354        }
355        frames
356    }
357
358    /// Decode the next frame without copying field data. Returns `Ok(None)` when
359    /// stream is exhausted. Returns `Err` on malformed data.
360    pub fn next_frame_ref(&mut self) -> Result<Option<DecodedFrameRef<'a>>, DecodeError> {
361        if self.pos >= self.data.len() {
362            return Ok(None);
363        }
364        if self.try_consume_reset_header() {
365            return self.next_frame_ref();
366        }
367        let remaining = &self.data[self.pos..];
368        let base = self.timestamp_base_ns;
369        let (frame, consumed) =
370            match codec::decode_frame_ref(remaining, |type_id| self.schema_info(type_id), base) {
371                Some(r) => r,
372                None => return Ok(None),
373            };
374        self.pos += consumed;
375        match frame {
376            FrameRef::Schema { type_id, entry } => {
377                let result = DecodedFrameRef::Schema(entry.clone());
378                self.register_schema(type_id, entry)
379                    .map_err(|msg| DecodeError {
380                        pos: self.pos,
381                        message: msg,
382                    })?;
383                Ok(Some(result))
384            }
385            FrameRef::Event {
386                type_id,
387                timestamp_ns,
388                values,
389            } => {
390                if let Some(ts) = timestamp_ns {
391                    self.timestamp_base_ns = ts;
392                }
393                Ok(Some(DecodedFrameRef::Event {
394                    type_id,
395                    timestamp_ns,
396                    values,
397                }))
398            }
399            FrameRef::StringPool(entries) => {
400                for e in &entries {
401                    if let Ok(s) = std::str::from_utf8(e.data) {
402                        self.string_pool
403                            .insert(InternedString(e.pool_id), s.to_string());
404                    }
405                }
406                Ok(Some(DecodedFrameRef::StringPool(entries)))
407            }
408            FrameRef::StackPool(entries) => {
409                for e in &entries {
410                    self.stack_pool
411                        .insert(InternedStackFrames(e.pool_id), e.to_stack_frames());
412                }
413                Ok(Some(DecodedFrameRef::StackPool(entries)))
414            }
415            FrameRef::TimestampReset(ts) => {
416                self.timestamp_base_ns = ts;
417                self.next_frame_ref()
418            }
419        }
420    }
421
422    /// Collect all remaining frames using zero-copy decoding. Stops on error or end of stream.
423    pub fn decode_all_ref(&mut self) -> Vec<DecodedFrameRef<'a>> {
424        let mut frames = Vec::new();
425        while let Ok(Some(f)) = self.next_frame_ref() {
426            frames.push(f);
427        }
428        frames
429    }
430
431    /// Process all events with a callback, avoiding per-event Vec allocations.
432    /// Schemas and string pools are registered automatically.
433    ///
434    /// The [`RawEvent`] passed to the callback borrows from the decoder's input
435    /// buffer. The `fields` slice is reused across calls, so values cannot be
436    /// stored across iterations without copying.
437    ///
438    /// Returns `Err` if the stream is malformed.
439    pub fn for_each_event(
440        &mut self,
441        mut f: impl for<'f> FnMut(RawEvent<'a, 'f>),
442    ) -> Result<(), DecodeError> {
443        self.try_for_each_event(|ev| {
444            f(ev);
445            Ok::<(), std::convert::Infallible>(())
446        })
447        .map_err(|e| match e {
448            TryForEachError::Decode(d) => d,
449            TryForEachError::User(inf) => match inf {},
450        })
451    }
452
453    /// Like [`for_each_event`](Self::for_each_event), but the callback may
454    /// return an error to stop iteration early.
455    pub fn try_for_each_event<E>(
456        &mut self,
457        mut f: impl for<'f> FnMut(RawEvent<'a, 'f>) -> Result<(), E>,
458    ) -> Result<(), TryForEachError<E>> {
459        let mut values_buf: Vec<FieldValueRef<'a>> = Vec::new();
460        while self.pos < self.data.len() {
461            let remaining = &self.data[self.pos..];
462            let tag = match remaining.first() {
463                Some(t) => *t,
464                None => break,
465            };
466            match tag {
467                codec::TAG_EVENT => {
468                    let mut pos = 1;
469                    let type_id = match remaining.get(pos..pos + 2) {
470                        Some(b) => {
471                            pos += 2;
472                            WireTypeId(u16::from_le_bytes(b.try_into().unwrap()))
473                        }
474                        None => {
475                            return Err(TryForEachError::Decode(DecodeError {
476                                pos: self.pos,
477                                message: "truncated event frame".into(),
478                            }));
479                        }
480                    };
481                    let cache = match self
482                        .schema_cache
483                        .get(type_id.0 as usize)
484                        .and_then(|s| s.as_ref())
485                    {
486                        Some(c) => c,
487                        None => {
488                            return Err(TryForEachError::Decode(DecodeError {
489                                pos: self.pos,
490                                message: format!("unknown type_id {type_id:?}"),
491                            }));
492                        }
493                    };
494
495                    let timestamp_ns = if cache.entry.has_timestamp {
496                        match codec::decode_u24_le(&remaining[pos..]) {
497                            Some(delta) => {
498                                pos += 3;
499                                Some(self.timestamp_base_ns + delta as u64)
500                            }
501                            None => {
502                                return Err(TryForEachError::Decode(DecodeError {
503                                    pos: self.pos + pos,
504                                    message: "truncated timestamp delta".into(),
505                                }));
506                            }
507                        }
508                    } else {
509                        None
510                    };
511
512                    values_buf.clear();
513                    for &ftag in &cache.field_tags {
514                        let inner_type = match FieldType::from_tag(ftag) {
515                            Some(ft) => ft,
516                            None => {
517                                return Err(TryForEachError::Decode(DecodeError {
518                                    pos: self.pos + pos,
519                                    message: format!("unknown field type tag {ftag:#x}"),
520                                }));
521                            }
522                        };
523                        if inner_type.is_optional() {
524                            match remaining.get(pos) {
525                                Some(0x00) => {
526                                    values_buf.push(FieldValueRef::None);
527                                    pos += 1;
528                                }
529                                Some(_) => {
530                                    pos += 1;
531                                    match FieldValueRef::decode(inner_type.inner(), remaining, pos)
532                                    {
533                                        Some((val, consumed)) => {
534                                            values_buf.push(val);
535                                            pos += consumed;
536                                        }
537                                        None => {
538                                            return Err(TryForEachError::Decode(DecodeError {
539                                                pos: self.pos + pos,
540                                                message: "truncated optional field value".into(),
541                                            }));
542                                        }
543                                    }
544                                }
545                                None => {
546                                    return Err(TryForEachError::Decode(DecodeError {
547                                        pos: self.pos + pos,
548                                        message: "truncated optional field prefix".into(),
549                                    }));
550                                }
551                            }
552                        } else {
553                            match FieldValueRef::decode(inner_type, remaining, pos) {
554                                Some((val, consumed)) => {
555                                    values_buf.push(val);
556                                    pos += consumed;
557                                }
558                                None => {
559                                    return Err(TryForEachError::Decode(DecodeError {
560                                        pos: self.pos + pos,
561                                        message: "truncated field value".into(),
562                                    }));
563                                }
564                            }
565                        }
566                    }
567                    // Update mutable state. The borrow checker allows this
568                    // because `cache` borrows `self.schema_cache` while we
569                    // mutate `self.pos` and `self.timestamp_base_ns`, which
570                    // are disjoint fields. We use a block with destructured
571                    // refs to make this explicit.
572                    {
573                        let Self {
574                            pos: self_pos,
575                            timestamp_base_ns,
576                            ..
577                        } = self;
578                        *self_pos += pos;
579                        if let Some(ts) = timestamp_ns {
580                            *timestamp_base_ns = ts;
581                        }
582                    }
583                    f(RawEvent {
584                        type_id,
585                        name: &cache.entry.name,
586                        timestamp_ns,
587                        fields: &values_buf,
588                        schema: &cache.entry,
589                        string_pool: &self.string_pool,
590                        stack_pool: &self.stack_pool,
591                    })
592                    .map_err(TryForEachError::User)?;
593                }
594                codec::TAG_TIMESTAMP_RESET => {
595                    let ts = match self.data.get(self.pos + 1..self.pos + 9) {
596                        Some(b) => u64::from_le_bytes(b.try_into().unwrap()),
597                        None => {
598                            return Err(TryForEachError::Decode(DecodeError {
599                                pos: self.pos,
600                                message: "truncated timestamp reset".into(),
601                            }));
602                        }
603                    };
604                    self.timestamp_base_ns = ts;
605                    self.pos += 9;
606                }
607                _ => {
608                    // Mid-stream header = reset frame (tag 0x54 = 'T' from TRC\0)
609                    if tag == codec::MAGIC[0] && self.try_consume_reset_header() {
610                        continue;
611                    }
612                    match self.next_frame_ref() {
613                        Ok(Some(_)) => {}
614                        Ok(None) => {
615                            return Err(TryForEachError::Decode(DecodeError {
616                                pos: self.pos,
617                                message: format!("failed to decode frame with tag 0x{tag:02x}"),
618                            }));
619                        }
620                        Err(e) => return Err(TryForEachError::Decode(e)),
621                    }
622                }
623            }
624        }
625        Ok(())
626    }
627
628    /// Returns an iterator that yields only [`DecodedFrameRef::Event`] variants,
629    /// silently consuming schema, string-pool, and symbol-table frames
630    /// (while still updating internal decoder state).
631    pub fn events(&mut self) -> EventIter<'_, 'a> {
632        EventIter { decoder: self }
633    }
634}
635
636impl<'a> Iterator for Decoder<'a> {
637    type Item = Result<DecodedFrameRef<'a>, DecodeError>;
638
639    fn next(&mut self) -> Option<Self::Item> {
640        self.next_frame_ref().transpose()
641    }
642}
643
644/// Iterator that yields only [`DecodedFrameRef::Event`] frames,
645/// consuming non-event frames to keep decoder state up to date.
646pub struct EventIter<'d, 'a> {
647    decoder: &'d mut Decoder<'a>,
648}
649
650impl<'d, 'a> Iterator for EventIter<'d, 'a> {
651    type Item = Result<DecodedFrameRef<'a>, DecodeError>;
652
653    fn next(&mut self) -> Option<Self::Item> {
654        loop {
655            match self.decoder.next()? {
656                Ok(frame @ DecodedFrameRef::Event { .. }) => return Some(Ok(frame)),
657                Ok(_) => continue, // schema, string pool, symbol table — skip
658                Err(e) => return Some(Err(e)),
659            }
660        }
661    }
662}
663
664#[cfg(test)]
665mod tests {
666    use super::*;
667    use crate::encoder::Encoder;
668    use crate::schema::FieldDef;
669    use crate::types::{FieldType, FieldValue};
670
671    #[test]
672    fn decode_empty_stream() {
673        let enc = Encoder::new();
674        let data = enc.finish();
675        let mut dec = Decoder::new(&data).unwrap();
676        assert_eq!(dec.version(), 1);
677        assert!(dec.next_frame().unwrap().is_none());
678    }
679
680    #[test]
681    fn decode_schema_frame() {
682        let mut enc = Encoder::new();
683        enc.register_schema(
684            "Ev",
685            vec![FieldDef {
686                name: "v".into(),
687                field_type: FieldType::Varint,
688            }],
689        )
690        .unwrap();
691        let data = enc.finish();
692        let mut dec = Decoder::new(&data).unwrap();
693        let frame = dec.next_frame().unwrap().unwrap();
694        assert!(matches!(frame, DecodedFrame::Schema(s) if s.name == "Ev"));
695    }
696
697    #[test]
698    fn decode_event_after_schema() {
699        let mut enc = Encoder::new();
700        let schema = enc
701            .register_schema(
702                "Ev",
703                vec![FieldDef {
704                    name: "v".into(),
705                    field_type: FieldType::Varint,
706                }],
707            )
708            .unwrap();
709        enc.write_event(
710            &schema,
711            &[FieldValue::Varint(1_000), FieldValue::Varint(42)],
712        )
713        .unwrap();
714        let data = enc.finish();
715
716        let mut dec = Decoder::new(&data).unwrap();
717        let frames = dec.decode_all();
718        assert_eq!(frames.len(), 2);
719        if let DecodedFrame::Event { values, .. } = &frames[1] {
720            assert_eq!(*values, vec![FieldValue::Varint(42)]);
721        } else {
722            panic!("expected event");
723        }
724    }
725
726    #[test]
727    fn decode_string_pool_builds_map() {
728        let mut enc = Encoder::new();
729        let id = enc.intern_string("hello").unwrap();
730        let data = enc.finish();
731
732        let mut dec = Decoder::new(&data).unwrap();
733        dec.decode_all();
734        assert_eq!(dec.string_pool().get(id), Some("hello"));
735    }
736
737    #[test]
738    fn decode_multiple_events() {
739        let mut enc = Encoder::new();
740        let schema = enc
741            .register_schema(
742                "Ev",
743                vec![FieldDef {
744                    name: "v".into(),
745                    field_type: FieldType::Varint,
746                }],
747            )
748            .unwrap();
749        for i in 0..10u64 {
750            enc.write_event(
751                &schema,
752                &[FieldValue::Varint(i * 1000), FieldValue::Varint(i)],
753            )
754            .unwrap();
755        }
756        let data = enc.finish();
757
758        let mut dec = Decoder::new(&data).unwrap();
759        let frames = dec.decode_all();
760        assert_eq!(frames.len(), 11);
761    }
762
763    #[test]
764    fn bad_header_returns_none() {
765        assert!(Decoder::new(&[0x00, 0x00, 0x00, 0x00, 1]).is_none());
766    }
767
768    #[test]
769    fn iterator_yields_all_frames() {
770        let mut enc = Encoder::new();
771        let schema = enc
772            .register_schema(
773                "Ev",
774                vec![FieldDef {
775                    name: "v".into(),
776                    field_type: FieldType::Varint,
777                }],
778            )
779            .unwrap();
780        for i in 0..3u64 {
781            enc.write_event(
782                &schema,
783                &[FieldValue::Varint(i * 1000), FieldValue::Varint(i)],
784            )
785            .unwrap();
786        }
787        let data = enc.finish();
788
789        let dec = Decoder::new(&data).unwrap();
790        let frames: Vec<_> = dec.collect::<Result<Vec<_>, _>>().unwrap();
791        // 1 schema + 3 events
792        assert_eq!(frames.len(), 4);
793        assert!(matches!(frames[0], DecodedFrameRef::Schema(_)));
794        assert!(matches!(frames[1], DecodedFrameRef::Event { .. }));
795    }
796
797    #[test]
798    fn iterator_early_termination() {
799        let mut enc = Encoder::new();
800        let schema = enc
801            .register_schema(
802                "Ev",
803                vec![FieldDef {
804                    name: "v".into(),
805                    field_type: FieldType::Varint,
806                }],
807            )
808            .unwrap();
809        for i in 0..10u64 {
810            enc.write_event(
811                &schema,
812                &[FieldValue::Varint(i * 1000), FieldValue::Varint(i)],
813            )
814            .unwrap();
815        }
816        let data = enc.finish();
817
818        let mut dec = Decoder::new(&data).unwrap();
819        // Take just 2 frames (schema + first event), don't decode the rest
820        let first_two: Vec<_> = dec.by_ref().take(2).collect::<Result<Vec<_>, _>>().unwrap();
821        assert_eq!(first_two.len(), 2);
822        // Decoder should still have remaining data
823        let next = dec.next();
824        assert!(next.is_some());
825    }
826
827    #[test]
828    fn events_iterator_skips_schema() {
829        let mut enc = Encoder::new();
830        let schema = enc
831            .register_schema(
832                "Ev",
833                vec![FieldDef {
834                    name: "v".into(),
835                    field_type: FieldType::Varint,
836                }],
837            )
838            .unwrap();
839        enc.write_event(
840            &schema,
841            &[FieldValue::Varint(1_000), FieldValue::Varint(42)],
842        )
843        .unwrap();
844        enc.write_event(
845            &schema,
846            &[FieldValue::Varint(2_000), FieldValue::Varint(99)],
847        )
848        .unwrap();
849        let data = enc.finish();
850
851        let mut dec = Decoder::new(&data).unwrap();
852        let events: Vec<_> = dec.events().collect::<Result<Vec<_>, _>>().unwrap();
853        // Only events, no schema frame
854        assert_eq!(events.len(), 2);
855        for ev in &events {
856            assert!(matches!(ev, DecodedFrameRef::Event { .. }));
857        }
858    }
859
860    #[test]
861    fn events_iterator_first_event_only() {
862        let mut enc = Encoder::new();
863        let schema = enc
864            .register_schema(
865                "Ev",
866                vec![FieldDef {
867                    name: "v".into(),
868                    field_type: FieldType::Varint,
869                }],
870            )
871            .unwrap();
872        for i in 0..5u64 {
873            enc.write_event(
874                &schema,
875                &[FieldValue::Varint(i * 1000), FieldValue::Varint(i)],
876            )
877            .unwrap();
878        }
879        let data = enc.finish();
880
881        let mut dec = Decoder::new(&data).unwrap();
882        // Get just the first event — schema is consumed internally
883        let first = dec.events().next().unwrap().unwrap();
884        assert!(matches!(first, DecodedFrameRef::Event { .. }));
885    }
886}