Skip to main content

dial9_trace_format/
decoder.rs

1//! Streaming decoder for reading trace files.
2//!
3//! [`Decoder`] reads the file header, processes schema and string-pool frames,
4//! and yields events as [`DecodedFrame`] (owned) or [`DecodedFrameRef`]
5//! (zero-copy). It also implements [`Iterator`] and provides a
6//! [`for_each_event`](Decoder::for_each_event) callback API for
7//! allocation-free processing.
8
9use crate::codec::{
10    self, Frame, FrameRef, HEADER_SIZE, PoolEntry, PoolEntryRef, SchemaInfo, StackPoolEntry,
11    StackPoolEntryRef, WireTypeId,
12};
13use crate::schema::{SchemaEntry, SchemaRegistry};
14use crate::types::{FieldType, FieldValueRef, InternedStackFrames, InternedString, StackFrames};
15use std::collections::HashMap;
16use std::fmt;
17
18/// Error returned when the decoder cannot continue reading the stream.
19/// Because frames are not length-prefixed, a decode error is unrecoverable —
20/// the decoder cannot skip the malformed frame to find the next one.
21#[derive(Debug, Clone)]
22pub struct DecodeError {
23    pub pos: usize,
24    pub message: String,
25}
26
27impl fmt::Display for DecodeError {
28    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
29        write!(f, "decode error at byte {}: {}", self.pos, self.message)
30    }
31}
32
33impl std::error::Error for DecodeError {}
34
35/// Error returned by [`Decoder::try_for_each_event`].
36#[derive(Debug)]
37pub enum TryForEachError<E> {
38    Decode(DecodeError),
39    User(E),
40}
41
42impl<E: fmt::Display> fmt::Display for TryForEachError<E> {
43    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
44        match self {
45            TryForEachError::Decode(e) => write!(f, "{e}"),
46            TryForEachError::User(e) => write!(f, "{e}"),
47        }
48    }
49}
50
51impl<E: fmt::Display + fmt::Debug> std::error::Error for TryForEachError<E> {}
52
53/// A decoded event passed to [`Decoder::for_each_event`].
54///
55/// `'a` is the lifetime of the input data buffer (strings, stack frames borrow from it).
56/// `'f` is the lifetime of the `fields` slice and schema name (reused across calls).
57#[non_exhaustive]
58pub struct RawEvent<'a, 'f> {
59    pub type_id: WireTypeId,
60    pub name: &'f str,
61    pub timestamp_ns: Option<u64>,
62    pub fields: &'f [FieldValueRef<'a>],
63    pub schema: &'f SchemaEntry,
64    pub string_pool: &'f StringPool,
65    pub stack_pool: &'f StackPool,
66}
67
68impl<'a, 'f> RawEvent<'a, 'f> {
69    /// Field names from the schema, parallel to `fields`.
70    pub fn field_names(&self) -> impl Iterator<Item = &'f str> {
71        self.schema.fields.iter().map(|f| f.name.as_str())
72    }
73
74    /// Deserialize this event into a typed value `E` via serde.
75    ///
76    /// The deserializer presents the event as a flat map containing:
77    ///
78    /// 1. `"event"` → the schema name (the discriminant for
79    ///    `#[serde(tag = "event")]`).
80    /// 2. `"timestamp_ns"` → the absolute frame-header timestamp (only if
81    ///    the schema has `has_timestamp = true`).
82    /// 3. One entry per schema field, keyed by field name.
83    ///
84    /// Pool-resolved values appear as their resolved form: `PooledString`
85    /// presents as a string, `PooledStackFrames` presents as a sequence of
86    /// `u64`. See [`crate::de`] for details.
87    ///
88    /// Available only when the `serde-deserialize` feature is enabled.
89    #[cfg(feature = "serde-deserialize")]
90    pub fn deserialize<E: serde::de::DeserializeOwned>(&self) -> Result<E, crate::de::DeserError> {
91        crate::de::from_raw_event(self)
92    }
93}
94
95/// A map from interned string IDs to their resolved string values.
96///
97/// Populated automatically by the [`Decoder`] as it processes `StringPool` frames.
98/// Pass a reference to [`crate::TraceEvent::decode`] so that `InternedString` fields
99/// resolve to `&str` in derived `Ref` types.
100#[derive(Debug, Default)]
101pub struct StringPool(pub(crate) HashMap<InternedString, String>);
102
103impl StringPool {
104    pub(crate) fn new() -> Self {
105        Self(HashMap::default())
106    }
107
108    pub(crate) fn insert(&mut self, id: InternedString, value: String) {
109        self.0.insert(id, value);
110    }
111
112    pub fn get(&self, id: InternedString) -> Option<&str> {
113        self.0.get(&id).map(|s| s.as_str())
114    }
115
116    pub fn len(&self) -> usize {
117        self.0.len()
118    }
119
120    pub fn is_empty(&self) -> bool {
121        self.0.is_empty()
122    }
123
124    /// Iterate over all interned strings as `(id, value)` pairs.
125    pub fn iter(&self) -> impl Iterator<Item = (InternedString, &str)> {
126        self.0.iter().map(|(&id, v)| (id, v.as_str()))
127    }
128}
129
130/// A map from interned stack-frame IDs to their resolved address vectors.
131///
132/// Populated automatically by the [`Decoder`] as it processes `StackPool` frames.
133#[derive(Debug, Default)]
134pub struct StackPool(pub(crate) HashMap<InternedStackFrames, Vec<u64>>);
135
136impl StackPool {
137    pub(crate) fn new() -> Self {
138        Self(HashMap::default())
139    }
140
141    pub(crate) fn insert(&mut self, id: InternedStackFrames, frames: StackFrames) {
142        self.0.insert(id, frames.0);
143    }
144
145    pub fn get(&self, id: InternedStackFrames) -> Option<&[u64]> {
146        self.0.get(&id).map(|v| v.as_slice())
147    }
148
149    pub fn len(&self) -> usize {
150        self.0.len()
151    }
152
153    pub fn is_empty(&self) -> bool {
154        self.0.is_empty()
155    }
156
157    /// Iterate over all interned stack frames as `(id, frames)` pairs.
158    pub fn iter(&self) -> impl Iterator<Item = (InternedStackFrames, &[u64])> {
159        self.0.iter().map(|(&id, v)| (id, v.as_slice()))
160    }
161}
162
163/// Decoded events yielded by the decoder.
164#[derive(Debug, Clone, PartialEq)]
165pub enum DecodedFrame {
166    Schema(SchemaEntry),
167    Event {
168        type_id: WireTypeId,
169        /// Absolute timestamp in nanoseconds, if the schema has `has_timestamp`.
170        timestamp_ns: Option<u64>,
171        values: Vec<crate::types::FieldValue>,
172    },
173    StringPool(Vec<PoolEntry>),
174    StackPool(Vec<StackPoolEntry>),
175    SchemaAnnotations {
176        type_id: WireTypeId,
177        annotations: Vec<crate::schema::FieldAnnotation>,
178    },
179}
180
181/// Zero-copy decoded frame that borrows from the input buffer.
182#[derive(Debug, Clone, PartialEq)]
183pub enum DecodedFrameRef<'a> {
184    Schema(SchemaEntry),
185    Event {
186        type_id: WireTypeId,
187        timestamp_ns: Option<u64>,
188        values: Vec<FieldValueRef<'a>>,
189    },
190    StringPool(Vec<PoolEntryRef<'a>>),
191    StackPool(Vec<StackPoolEntryRef<'a>>),
192    SchemaAnnotations {
193        type_id: WireTypeId,
194        annotations: Vec<crate::schema::FieldAnnotation>,
195    },
196}
197
198struct SchemaCache {
199    entry: SchemaEntry,
200    /// Raw field type tags for fast decode (avoids re-extracting from entry.fields).
201    field_tags: Vec<u8>,
202}
203
204/// Streaming trace file decoder.
205///
206/// Reads from a byte slice, processing schema, string-pool, and event frames.
207/// Implements [`Iterator`] over [`DecodedFrameRef`] for convenient consumption.
208pub struct Decoder<'a> {
209    data: &'a [u8],
210    pos: usize,
211    registry: SchemaRegistry,
212    schema_cache: Vec<Option<SchemaCache>>,
213    string_pool: StringPool,
214    stack_pool: StackPool,
215    version: u8,
216    timestamp_base_ns: u64,
217}
218
219impl<'a> Decoder<'a> {
220    pub fn new(data: &'a [u8]) -> Option<Self> {
221        let version = codec::decode_header(data)?;
222        Some(Self {
223            data,
224            pos: HEADER_SIZE,
225            registry: SchemaRegistry::new(),
226            schema_cache: Vec::new(),
227            string_pool: StringPool::new(),
228            stack_pool: StackPool::new(),
229            version,
230            timestamp_base_ns: 0,
231        })
232    }
233
234    pub fn registry(&self) -> &SchemaRegistry {
235        &self.registry
236    }
237
238    pub fn version(&self) -> u8 {
239        self.version
240    }
241
242    pub fn string_pool(&self) -> &StringPool {
243        &self.string_pool
244    }
245
246    pub fn stack_pool(&self) -> &StackPool {
247        &self.stack_pool
248    }
249
250    /// Reset decoder state (schemas, string pool, timestamp base) as if
251    /// starting a fresh stream. Used when a mid-stream header is encountered
252    /// (the "reset frame" pattern for concatenated thread-local batches).
253    fn reset_state(&mut self) {
254        self.registry = SchemaRegistry::new();
255        self.schema_cache.clear();
256        self.string_pool = StringPool::new();
257        self.stack_pool = StackPool::new();
258        self.timestamp_base_ns = 0;
259    }
260
261    /// If the current position starts with a valid header, reset state and
262    /// skip past it, returning true.
263    fn try_consume_reset_header(&mut self) -> bool {
264        if self.pos + HEADER_SIZE <= self.data.len()
265            && codec::decode_header(&self.data[self.pos..]).is_some()
266        {
267            self.reset_state();
268            self.pos += HEADER_SIZE;
269            true
270        } else {
271            false
272        }
273    }
274
275    /// Consume this decoder and create an [`Encoder`](crate::encoder::Encoder) that appends to the
276    /// decoded trace. The encoder inherits the string pool, schema registry,
277    /// and timestamp base so new frames are compatible with the existing data.
278    ///
279    /// No file header is written — the caller is responsible for concatenating
280    /// the encoder's output after the original trace bytes.
281    pub fn into_encoder<W: std::io::Write>(self, writer: W) -> crate::encoder::Encoder<W> {
282        crate::encoder::Encoder::from_decoder(
283            self.registry,
284            self.string_pool,
285            self.stack_pool,
286            self.timestamp_base_ns,
287            writer,
288        )
289    }
290
291    pub(crate) fn schema_info(&self, type_id: WireTypeId) -> Option<SchemaInfo<'_>> {
292        self.schema_cache
293            .get(type_id.0 as usize)
294            .and_then(|s| s.as_ref())
295            .map(|c| SchemaInfo {
296                field_tags: &c.field_tags,
297                has_timestamp: c.entry.has_timestamp,
298            })
299    }
300
301    fn register_schema(&mut self, type_id: WireTypeId, entry: SchemaEntry) -> Result<(), String> {
302        let idx = type_id.0 as usize;
303        if idx >= self.schema_cache.len() {
304            self.schema_cache.resize_with(idx + 1, || None);
305        }
306        self.schema_cache[idx] = Some(SchemaCache {
307            field_tags: entry.fields.iter().map(|f| f.field_type as u8).collect(),
308            entry: entry.clone(),
309        });
310        self.registry.register(type_id, entry)
311    }
312
313    /// Decode the next frame. Returns `Ok(None)` when stream is exhausted.
314    /// Returns `Err` if the stream is malformed (e.g. duplicate type_id with
315    /// a different schema).
316    pub fn next_frame(&mut self) -> Result<Option<DecodedFrame>, DecodeError> {
317        if self.pos >= self.data.len() {
318            return Ok(None);
319        }
320        if self.try_consume_reset_header() {
321            return self.next_frame();
322        }
323        let remaining = &self.data[self.pos..];
324        let base = self.timestamp_base_ns;
325        let (frame, consumed) =
326            match codec::decode_frame(remaining, |type_id| self.schema_info(type_id), base) {
327                Some(r) => r,
328                None => return Ok(None),
329            };
330        self.pos += consumed;
331        match frame {
332            Frame::Schema { type_id, entry } => {
333                let result = DecodedFrame::Schema(entry.clone());
334                self.register_schema(type_id, entry)
335                    .map_err(|msg| DecodeError {
336                        pos: self.pos,
337                        message: msg,
338                    })?;
339                Ok(Some(result))
340            }
341            Frame::Event {
342                type_id,
343                timestamp_ns,
344                values,
345            } => {
346                if let Some(ts) = timestamp_ns {
347                    self.timestamp_base_ns = ts;
348                }
349                Ok(Some(DecodedFrame::Event {
350                    type_id,
351                    timestamp_ns,
352                    values,
353                }))
354            }
355            Frame::StringPool(entries) => {
356                for e in &entries {
357                    if let Ok(s) = String::from_utf8(e.data.clone()) {
358                        self.string_pool.insert(InternedString(e.pool_id), s);
359                    }
360                }
361                Ok(Some(DecodedFrame::StringPool(entries)))
362            }
363            Frame::StackPool(entries) => {
364                for e in &entries {
365                    self.stack_pool
366                        .insert(InternedStackFrames(e.pool_id), e.frames.clone().into());
367                }
368                Ok(Some(DecodedFrame::StackPool(entries)))
369            }
370            Frame::TimestampReset(ts) => {
371                self.timestamp_base_ns = ts;
372                self.next_frame() // consume silently, return next real frame
373            }
374            Frame::SchemaAnnotations {
375                type_id,
376                annotations,
377            } => {
378                // Merge annotations into the cached schema (lenient: skip if unknown type_id)
379                if let Some(cache) = self
380                    .schema_cache
381                    .get_mut(type_id.0 as usize)
382                    .and_then(|s| s.as_mut())
383                {
384                    cache.entry.annotations.extend_from_slice(&annotations);
385                }
386                if let Some(entry) = self.registry.schemas.get_mut(&type_id) {
387                    entry.annotations.extend_from_slice(&annotations);
388                }
389                Ok(Some(DecodedFrame::SchemaAnnotations {
390                    type_id,
391                    annotations,
392                }))
393            }
394        }
395    }
396
397    /// Collect all remaining frames. Stops on error or end of stream.
398    pub fn decode_all(&mut self) -> Vec<DecodedFrame> {
399        let mut frames = Vec::new();
400        while let Ok(Some(f)) = self.next_frame() {
401            frames.push(f);
402        }
403        frames
404    }
405
406    /// Decode the next frame without copying field data. Returns `Ok(None)` when
407    /// stream is exhausted. Returns `Err` on malformed data.
408    pub fn next_frame_ref(&mut self) -> Result<Option<DecodedFrameRef<'a>>, DecodeError> {
409        if self.pos >= self.data.len() {
410            return Ok(None);
411        }
412        if self.try_consume_reset_header() {
413            return self.next_frame_ref();
414        }
415        let remaining = &self.data[self.pos..];
416        let base = self.timestamp_base_ns;
417        let (frame, consumed) =
418            match codec::decode_frame_ref(remaining, |type_id| self.schema_info(type_id), base) {
419                Some(r) => r,
420                None => return Ok(None),
421            };
422        self.pos += consumed;
423        match frame {
424            FrameRef::Schema { type_id, entry } => {
425                let result = DecodedFrameRef::Schema(entry.clone());
426                self.register_schema(type_id, entry)
427                    .map_err(|msg| DecodeError {
428                        pos: self.pos,
429                        message: msg,
430                    })?;
431                Ok(Some(result))
432            }
433            FrameRef::Event {
434                type_id,
435                timestamp_ns,
436                values,
437            } => {
438                if let Some(ts) = timestamp_ns {
439                    self.timestamp_base_ns = ts;
440                }
441                Ok(Some(DecodedFrameRef::Event {
442                    type_id,
443                    timestamp_ns,
444                    values,
445                }))
446            }
447            FrameRef::StringPool(entries) => {
448                for e in &entries {
449                    if let Ok(s) = std::str::from_utf8(e.data) {
450                        self.string_pool
451                            .insert(InternedString(e.pool_id), s.to_string());
452                    }
453                }
454                Ok(Some(DecodedFrameRef::StringPool(entries)))
455            }
456            FrameRef::StackPool(entries) => {
457                for e in &entries {
458                    self.stack_pool
459                        .insert(InternedStackFrames(e.pool_id), e.to_stack_frames());
460                }
461                Ok(Some(DecodedFrameRef::StackPool(entries)))
462            }
463            FrameRef::TimestampReset(ts) => {
464                self.timestamp_base_ns = ts;
465                self.next_frame_ref()
466            }
467            FrameRef::SchemaAnnotations {
468                type_id,
469                annotations,
470            } => {
471                if let Some(cache) = self
472                    .schema_cache
473                    .get_mut(type_id.0 as usize)
474                    .and_then(|s| s.as_mut())
475                {
476                    cache.entry.annotations.extend_from_slice(&annotations);
477                }
478                if let Some(entry) = self.registry.schemas.get_mut(&type_id) {
479                    entry.annotations.extend_from_slice(&annotations);
480                }
481                Ok(Some(DecodedFrameRef::SchemaAnnotations {
482                    type_id,
483                    annotations,
484                }))
485            }
486        }
487    }
488
489    /// Collect all remaining frames using zero-copy decoding. Stops on error or end of stream.
490    pub fn decode_all_ref(&mut self) -> Vec<DecodedFrameRef<'a>> {
491        let mut frames = Vec::new();
492        while let Ok(Some(f)) = self.next_frame_ref() {
493            frames.push(f);
494        }
495        frames
496    }
497
498    /// Process all events with a callback, avoiding per-event Vec allocations.
499    /// Schemas and string pools are registered automatically.
500    ///
501    /// The [`RawEvent`] passed to the callback borrows from the decoder's input
502    /// buffer. The `fields` slice is reused across calls, so values cannot be
503    /// stored across iterations without copying.
504    ///
505    /// Returns `Err` if the stream is malformed.
506    pub fn for_each_event(
507        &mut self,
508        mut f: impl for<'f> FnMut(RawEvent<'a, 'f>),
509    ) -> Result<(), DecodeError> {
510        self.try_for_each_event(|ev| {
511            f(ev);
512            Ok::<(), std::convert::Infallible>(())
513        })
514        .map_err(|e| match e {
515            TryForEachError::Decode(d) => d,
516            TryForEachError::User(inf) => match inf {},
517        })
518    }
519
520    /// Like [`for_each_event`](Self::for_each_event), but the callback may
521    /// return an error to stop iteration early.
522    pub fn try_for_each_event<E>(
523        &mut self,
524        mut f: impl for<'f> FnMut(RawEvent<'a, 'f>) -> Result<(), E>,
525    ) -> Result<(), TryForEachError<E>> {
526        let mut values_buf: Vec<FieldValueRef<'a>> = Vec::new();
527        while self.pos < self.data.len() {
528            let remaining = &self.data[self.pos..];
529            let tag = match remaining.first() {
530                Some(t) => *t,
531                None => break,
532            };
533            match tag {
534                codec::TAG_EVENT => {
535                    let mut pos = 1;
536                    let type_id = match remaining.get(pos..pos + 2) {
537                        Some(b) => {
538                            pos += 2;
539                            WireTypeId(u16::from_le_bytes(b.try_into().unwrap()))
540                        }
541                        None => {
542                            return Err(TryForEachError::Decode(DecodeError {
543                                pos: self.pos,
544                                message: "truncated event frame".into(),
545                            }));
546                        }
547                    };
548                    let cache = match self
549                        .schema_cache
550                        .get(type_id.0 as usize)
551                        .and_then(|s| s.as_ref())
552                    {
553                        Some(c) => c,
554                        None => {
555                            return Err(TryForEachError::Decode(DecodeError {
556                                pos: self.pos,
557                                message: format!("unknown type_id {type_id:?}"),
558                            }));
559                        }
560                    };
561
562                    let timestamp_ns = if cache.entry.has_timestamp {
563                        match codec::decode_u24_le(&remaining[pos..]) {
564                            Some(delta) => {
565                                pos += 3;
566                                Some(self.timestamp_base_ns + delta as u64)
567                            }
568                            None => {
569                                return Err(TryForEachError::Decode(DecodeError {
570                                    pos: self.pos + pos,
571                                    message: "truncated timestamp delta".into(),
572                                }));
573                            }
574                        }
575                    } else {
576                        None
577                    };
578
579                    values_buf.clear();
580                    for &ftag in &cache.field_tags {
581                        let inner_type = match FieldType::from_tag(ftag) {
582                            Some(ft) => ft,
583                            None => {
584                                return Err(TryForEachError::Decode(DecodeError {
585                                    pos: self.pos + pos,
586                                    message: format!("unknown field type tag {ftag:#x}"),
587                                }));
588                            }
589                        };
590                        if inner_type.is_optional() {
591                            match remaining.get(pos) {
592                                Some(0x00) => {
593                                    values_buf.push(FieldValueRef::None);
594                                    pos += 1;
595                                }
596                                Some(_) => {
597                                    pos += 1;
598                                    match FieldValueRef::decode(inner_type.inner(), remaining, pos)
599                                    {
600                                        Some((val, consumed)) => {
601                                            values_buf.push(val);
602                                            pos += consumed;
603                                        }
604                                        None => {
605                                            return Err(TryForEachError::Decode(DecodeError {
606                                                pos: self.pos + pos,
607                                                message: "truncated optional field value".into(),
608                                            }));
609                                        }
610                                    }
611                                }
612                                None => {
613                                    return Err(TryForEachError::Decode(DecodeError {
614                                        pos: self.pos + pos,
615                                        message: "truncated optional field prefix".into(),
616                                    }));
617                                }
618                            }
619                        } else {
620                            match FieldValueRef::decode(inner_type, remaining, pos) {
621                                Some((val, consumed)) => {
622                                    values_buf.push(val);
623                                    pos += consumed;
624                                }
625                                None => {
626                                    return Err(TryForEachError::Decode(DecodeError {
627                                        pos: self.pos + pos,
628                                        message: "truncated field value".into(),
629                                    }));
630                                }
631                            }
632                        }
633                    }
634                    // Update mutable state. The borrow checker allows this
635                    // because `cache` borrows `self.schema_cache` while we
636                    // mutate `self.pos` and `self.timestamp_base_ns`, which
637                    // are disjoint fields. We use a block with destructured
638                    // refs to make this explicit.
639                    {
640                        let Self {
641                            pos: self_pos,
642                            timestamp_base_ns,
643                            ..
644                        } = self;
645                        *self_pos += pos;
646                        if let Some(ts) = timestamp_ns {
647                            *timestamp_base_ns = ts;
648                        }
649                    }
650                    f(RawEvent {
651                        type_id,
652                        name: &cache.entry.name,
653                        timestamp_ns,
654                        fields: &values_buf,
655                        schema: &cache.entry,
656                        string_pool: &self.string_pool,
657                        stack_pool: &self.stack_pool,
658                    })
659                    .map_err(TryForEachError::User)?;
660                }
661                codec::TAG_TIMESTAMP_RESET => {
662                    let ts = match self.data.get(self.pos + 1..self.pos + 9) {
663                        Some(b) => u64::from_le_bytes(b.try_into().unwrap()),
664                        None => {
665                            return Err(TryForEachError::Decode(DecodeError {
666                                pos: self.pos,
667                                message: "truncated timestamp reset".into(),
668                            }));
669                        }
670                    };
671                    self.timestamp_base_ns = ts;
672                    self.pos += 9;
673                }
674                _ => {
675                    // Mid-stream header = reset frame (tag 0x54 = 'T' from TRC\0)
676                    if tag == codec::MAGIC[0] && self.try_consume_reset_header() {
677                        continue;
678                    }
679                    match self.next_frame_ref() {
680                        Ok(Some(_)) => {}
681                        Ok(None) => {
682                            return Err(TryForEachError::Decode(DecodeError {
683                                pos: self.pos,
684                                message: format!("failed to decode frame with tag 0x{tag:02x}"),
685                            }));
686                        }
687                        Err(e) => return Err(TryForEachError::Decode(e)),
688                    }
689                }
690            }
691        }
692        Ok(())
693    }
694
695    /// Returns an iterator that yields only [`DecodedFrameRef::Event`] variants,
696    /// silently consuming schema, string-pool, and symbol-table frames
697    /// (while still updating internal decoder state).
698    pub fn events(&mut self) -> EventIter<'_, 'a> {
699        EventIter { decoder: self }
700    }
701}
702
703impl<'a> Iterator for Decoder<'a> {
704    type Item = Result<DecodedFrameRef<'a>, DecodeError>;
705
706    fn next(&mut self) -> Option<Self::Item> {
707        self.next_frame_ref().transpose()
708    }
709}
710
711/// Iterator that yields only [`DecodedFrameRef::Event`] frames,
712/// consuming non-event frames to keep decoder state up to date.
713pub struct EventIter<'d, 'a> {
714    decoder: &'d mut Decoder<'a>,
715}
716
717impl<'d, 'a> Iterator for EventIter<'d, 'a> {
718    type Item = Result<DecodedFrameRef<'a>, DecodeError>;
719
720    fn next(&mut self) -> Option<Self::Item> {
721        loop {
722            match self.decoder.next()? {
723                Ok(frame @ DecodedFrameRef::Event { .. }) => return Some(Ok(frame)),
724                Ok(_) => continue, // schema, string pool, symbol table — skip
725                Err(e) => return Some(Err(e)),
726            }
727        }
728    }
729}
730
731#[cfg(test)]
732mod tests {
733    use super::*;
734    use crate::encoder::Encoder;
735    use crate::schema::FieldDef;
736    use crate::types::{FieldType, FieldValue};
737
738    #[test]
739    fn decode_empty_stream() {
740        let enc = Encoder::new();
741        let data = enc.finish();
742        let mut dec = Decoder::new(&data).unwrap();
743        assert_eq!(dec.version(), 1);
744        assert!(dec.next_frame().unwrap().is_none());
745    }
746
747    #[test]
748    fn decode_schema_frame() {
749        let mut enc = Encoder::new();
750        enc.register_schema(
751            "Ev",
752            vec![FieldDef {
753                name: "v".into(),
754                field_type: FieldType::Varint,
755            }],
756        )
757        .unwrap();
758        let data = enc.finish();
759        let mut dec = Decoder::new(&data).unwrap();
760        let frame = dec.next_frame().unwrap().unwrap();
761        assert!(matches!(frame, DecodedFrame::Schema(s) if s.name == "Ev"));
762    }
763
764    #[test]
765    fn decode_event_after_schema() {
766        let mut enc = Encoder::new();
767        let schema = enc
768            .register_schema(
769                "Ev",
770                vec![FieldDef {
771                    name: "v".into(),
772                    field_type: FieldType::Varint,
773                }],
774            )
775            .unwrap();
776        enc.write_event(
777            &schema,
778            &[FieldValue::Varint(1_000), FieldValue::Varint(42)],
779        )
780        .unwrap();
781        let data = enc.finish();
782
783        let mut dec = Decoder::new(&data).unwrap();
784        let frames = dec.decode_all();
785        assert_eq!(frames.len(), 2);
786        if let DecodedFrame::Event { values, .. } = &frames[1] {
787            assert_eq!(*values, vec![FieldValue::Varint(42)]);
788        } else {
789            panic!("expected event");
790        }
791    }
792
793    #[test]
794    fn decode_string_pool_builds_map() {
795        let mut enc = Encoder::new();
796        let id = enc.intern_string("hello").unwrap();
797        let data = enc.finish();
798
799        let mut dec = Decoder::new(&data).unwrap();
800        dec.decode_all();
801        assert_eq!(dec.string_pool().get(id), Some("hello"));
802    }
803
804    #[test]
805    fn decode_multiple_events() {
806        let mut enc = Encoder::new();
807        let schema = enc
808            .register_schema(
809                "Ev",
810                vec![FieldDef {
811                    name: "v".into(),
812                    field_type: FieldType::Varint,
813                }],
814            )
815            .unwrap();
816        for i in 0..10u64 {
817            enc.write_event(
818                &schema,
819                &[FieldValue::Varint(i * 1000), FieldValue::Varint(i)],
820            )
821            .unwrap();
822        }
823        let data = enc.finish();
824
825        let mut dec = Decoder::new(&data).unwrap();
826        let frames = dec.decode_all();
827        assert_eq!(frames.len(), 11);
828    }
829
830    #[test]
831    fn bad_header_returns_none() {
832        assert!(Decoder::new(&[0x00, 0x00, 0x00, 0x00, 1]).is_none());
833    }
834
835    #[test]
836    fn iterator_yields_all_frames() {
837        let mut enc = Encoder::new();
838        let schema = enc
839            .register_schema(
840                "Ev",
841                vec![FieldDef {
842                    name: "v".into(),
843                    field_type: FieldType::Varint,
844                }],
845            )
846            .unwrap();
847        for i in 0..3u64 {
848            enc.write_event(
849                &schema,
850                &[FieldValue::Varint(i * 1000), FieldValue::Varint(i)],
851            )
852            .unwrap();
853        }
854        let data = enc.finish();
855
856        let dec = Decoder::new(&data).unwrap();
857        let frames: Vec<_> = dec.collect::<Result<Vec<_>, _>>().unwrap();
858        // 1 schema + 3 events
859        assert_eq!(frames.len(), 4);
860        assert!(matches!(frames[0], DecodedFrameRef::Schema(_)));
861        assert!(matches!(frames[1], DecodedFrameRef::Event { .. }));
862    }
863
864    #[test]
865    fn iterator_early_termination() {
866        let mut enc = Encoder::new();
867        let schema = enc
868            .register_schema(
869                "Ev",
870                vec![FieldDef {
871                    name: "v".into(),
872                    field_type: FieldType::Varint,
873                }],
874            )
875            .unwrap();
876        for i in 0..10u64 {
877            enc.write_event(
878                &schema,
879                &[FieldValue::Varint(i * 1000), FieldValue::Varint(i)],
880            )
881            .unwrap();
882        }
883        let data = enc.finish();
884
885        let mut dec = Decoder::new(&data).unwrap();
886        // Take just 2 frames (schema + first event), don't decode the rest
887        let first_two: Vec<_> = dec.by_ref().take(2).collect::<Result<Vec<_>, _>>().unwrap();
888        assert_eq!(first_two.len(), 2);
889        // Decoder should still have remaining data
890        let next = dec.next();
891        assert!(next.is_some());
892    }
893
894    #[test]
895    fn events_iterator_skips_schema() {
896        let mut enc = Encoder::new();
897        let schema = enc
898            .register_schema(
899                "Ev",
900                vec![FieldDef {
901                    name: "v".into(),
902                    field_type: FieldType::Varint,
903                }],
904            )
905            .unwrap();
906        enc.write_event(
907            &schema,
908            &[FieldValue::Varint(1_000), FieldValue::Varint(42)],
909        )
910        .unwrap();
911        enc.write_event(
912            &schema,
913            &[FieldValue::Varint(2_000), FieldValue::Varint(99)],
914        )
915        .unwrap();
916        let data = enc.finish();
917
918        let mut dec = Decoder::new(&data).unwrap();
919        let events: Vec<_> = dec.events().collect::<Result<Vec<_>, _>>().unwrap();
920        // Only events, no schema frame
921        assert_eq!(events.len(), 2);
922        for ev in &events {
923            assert!(matches!(ev, DecodedFrameRef::Event { .. }));
924        }
925    }
926
927    #[test]
928    fn events_iterator_first_event_only() {
929        let mut enc = Encoder::new();
930        let schema = enc
931            .register_schema(
932                "Ev",
933                vec![FieldDef {
934                    name: "v".into(),
935                    field_type: FieldType::Varint,
936                }],
937            )
938            .unwrap();
939        for i in 0..5u64 {
940            enc.write_event(
941                &schema,
942                &[FieldValue::Varint(i * 1000), FieldValue::Varint(i)],
943            )
944            .unwrap();
945        }
946        let data = enc.finish();
947
948        let mut dec = Decoder::new(&data).unwrap();
949        // Get just the first event — schema is consumed internally
950        let first = dec.events().next().unwrap().unwrap();
951        assert!(matches!(first, DecodedFrameRef::Event { .. }));
952    }
953}