Skip to main content

dial9_trace_format/
decoder.rs

1//! Streaming decoder for reading trace files.
2//!
3//! [`Decoder`] reads the file header, processes schema and string-pool frames,
4//! and yields events as [`DecodedFrame`] (owned) or [`DecodedFrameRef`]
5//! (zero-copy). It also implements [`Iterator`] and provides a
6//! [`for_each_event`](Decoder::for_each_event) callback API for
7//! allocation-free processing.
8
9use crate::codec::{
10    self, Frame, FrameRef, HEADER_SIZE, PoolEntry, PoolEntryRef, SchemaInfo, WireTypeId,
11};
12use crate::schema::{SchemaEntry, SchemaRegistry};
13use crate::types::{FieldType, FieldValueRef, InternedString};
14use std::collections::HashMap;
15use std::fmt;
16
17/// Error returned when the decoder cannot continue reading the stream.
18/// Because frames are not length-prefixed, a decode error is unrecoverable —
19/// the decoder cannot skip the malformed frame to find the next one.
20#[derive(Debug, Clone)]
21pub struct DecodeError {
22    pub pos: usize,
23    pub message: String,
24}
25
26impl fmt::Display for DecodeError {
27    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
28        write!(f, "decode error at byte {}: {}", self.pos, self.message)
29    }
30}
31
32impl std::error::Error for DecodeError {}
33
34/// Error returned by [`Decoder::try_for_each_event`].
35#[derive(Debug)]
36pub enum TryForEachError<E> {
37    Decode(DecodeError),
38    User(E),
39}
40
41impl<E: fmt::Display> fmt::Display for TryForEachError<E> {
42    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
43        match self {
44            TryForEachError::Decode(e) => write!(f, "{e}"),
45            TryForEachError::User(e) => write!(f, "{e}"),
46        }
47    }
48}
49
50impl<E: fmt::Display + fmt::Debug> std::error::Error for TryForEachError<E> {}
51
52/// A decoded event passed to [`Decoder::for_each_event`].
53///
54/// `'a` is the lifetime of the input data buffer (strings, stack frames borrow from it).
55/// `'f` is the lifetime of the `fields` slice and schema name (reused across calls).
56#[non_exhaustive]
57pub struct RawEvent<'a, 'f> {
58    pub type_id: WireTypeId,
59    pub name: &'f str,
60    pub timestamp_ns: Option<u64>,
61    pub fields: &'f [FieldValueRef<'a>],
62    pub schema: &'f SchemaEntry,
63    pub string_pool: &'f StringPool,
64}
65
66impl<'a, 'f> RawEvent<'a, 'f> {
67    /// Field names from the schema, parallel to `fields`.
68    pub fn field_names(&self) -> impl Iterator<Item = &'f str> {
69        self.schema.fields.iter().map(|f| f.name.as_str())
70    }
71}
72
73/// A map from interned string IDs to their resolved string values.
74///
75/// Populated automatically by the [`Decoder`] as it processes `StringPool` frames.
76/// Pass a reference to [`crate::TraceEvent::decode`] so that `InternedString` fields
77/// resolve to `&str` in derived `Ref` types.
78#[derive(Debug, Default)]
79pub struct StringPool(pub(crate) HashMap<InternedString, String>);
80
81impl StringPool {
82    pub(crate) fn new() -> Self {
83        Self(HashMap::default())
84    }
85
86    pub(crate) fn insert(&mut self, id: InternedString, value: String) {
87        self.0.insert(id, value);
88    }
89
90    pub fn get(&self, id: InternedString) -> Option<&str> {
91        self.0.get(&id).map(|s| s.as_str())
92    }
93
94    pub fn len(&self) -> usize {
95        self.0.len()
96    }
97
98    pub fn is_empty(&self) -> bool {
99        self.0.is_empty()
100    }
101
102    /// Iterate over all interned strings as `(id, value)` pairs.
103    pub fn iter(&self) -> impl Iterator<Item = (InternedString, &str)> {
104        self.0.iter().map(|(&id, v)| (id, v.as_str()))
105    }
106}
107
108/// Decoded events yielded by the decoder.
109#[derive(Debug, Clone, PartialEq)]
110pub enum DecodedFrame {
111    Schema(SchemaEntry),
112    Event {
113        type_id: WireTypeId,
114        /// Absolute timestamp in nanoseconds, if the schema has `has_timestamp`.
115        timestamp_ns: Option<u64>,
116        values: Vec<crate::types::FieldValue>,
117    },
118    StringPool(Vec<PoolEntry>),
119}
120
121/// Zero-copy decoded frame that borrows from the input buffer.
122#[derive(Debug, Clone, PartialEq)]
123pub enum DecodedFrameRef<'a> {
124    Schema(SchemaEntry),
125    Event {
126        type_id: WireTypeId,
127        timestamp_ns: Option<u64>,
128        values: Vec<FieldValueRef<'a>>,
129    },
130    StringPool(Vec<PoolEntryRef<'a>>),
131}
132
133struct SchemaCache {
134    entry: SchemaEntry,
135    /// Raw field type tags for fast decode (avoids re-extracting from entry.fields).
136    field_tags: Vec<u8>,
137}
138
139/// Streaming trace file decoder.
140///
141/// Reads from a byte slice, processing schema, string-pool, and event frames.
142/// Implements [`Iterator`] over [`DecodedFrameRef`] for convenient consumption.
143pub struct Decoder<'a> {
144    data: &'a [u8],
145    pos: usize,
146    registry: SchemaRegistry,
147    schema_cache: Vec<Option<SchemaCache>>,
148    string_pool: StringPool,
149    version: u8,
150    timestamp_base_ns: u64,
151}
152
153impl<'a> Decoder<'a> {
154    pub fn new(data: &'a [u8]) -> Option<Self> {
155        let version = codec::decode_header(data)?;
156        Some(Self {
157            data,
158            pos: HEADER_SIZE,
159            registry: SchemaRegistry::new(),
160            schema_cache: Vec::new(),
161            string_pool: StringPool::new(),
162            version,
163            timestamp_base_ns: 0,
164        })
165    }
166
167    pub fn registry(&self) -> &SchemaRegistry {
168        &self.registry
169    }
170
171    pub fn version(&self) -> u8 {
172        self.version
173    }
174
175    pub fn string_pool(&self) -> &StringPool {
176        &self.string_pool
177    }
178
179    /// Reset decoder state (schemas, string pool, timestamp base) as if
180    /// starting a fresh stream. Used when a mid-stream header is encountered
181    /// (the "reset frame" pattern for concatenated thread-local batches).
182    fn reset_state(&mut self) {
183        self.registry = SchemaRegistry::new();
184        self.schema_cache.clear();
185        self.string_pool = StringPool::new();
186        self.timestamp_base_ns = 0;
187    }
188
189    /// If the current position starts with a valid header, reset state and
190    /// skip past it, returning true.
191    fn try_consume_reset_header(&mut self) -> bool {
192        if self.pos + HEADER_SIZE <= self.data.len()
193            && codec::decode_header(&self.data[self.pos..]).is_some()
194        {
195            self.reset_state();
196            self.pos += HEADER_SIZE;
197            true
198        } else {
199            false
200        }
201    }
202
203    /// Consume this decoder and create an [`Encoder`](crate::encoder::Encoder) that appends to the
204    /// decoded trace. The encoder inherits the string pool, schema registry,
205    /// and timestamp base so new frames are compatible with the existing data.
206    ///
207    /// No file header is written — the caller is responsible for concatenating
208    /// the encoder's output after the original trace bytes.
209    pub fn into_encoder<W: std::io::Write>(self, writer: W) -> crate::encoder::Encoder<W> {
210        crate::encoder::Encoder::from_decoder(
211            self.registry,
212            self.string_pool,
213            self.timestamp_base_ns,
214            writer,
215        )
216    }
217
218    pub(crate) fn schema_info(&self, type_id: WireTypeId) -> Option<SchemaInfo<'_>> {
219        self.schema_cache
220            .get(type_id.0 as usize)
221            .and_then(|s| s.as_ref())
222            .map(|c| SchemaInfo {
223                field_tags: &c.field_tags,
224                has_timestamp: c.entry.has_timestamp,
225            })
226    }
227
228    fn register_schema(&mut self, type_id: WireTypeId, entry: SchemaEntry) -> Result<(), String> {
229        let idx = type_id.0 as usize;
230        if idx >= self.schema_cache.len() {
231            self.schema_cache.resize_with(idx + 1, || None);
232        }
233        self.schema_cache[idx] = Some(SchemaCache {
234            field_tags: entry.fields.iter().map(|f| f.field_type as u8).collect(),
235            entry: entry.clone(),
236        });
237        self.registry.register(type_id, entry)
238    }
239
240    /// Decode the next frame. Returns `Ok(None)` when stream is exhausted.
241    /// Returns `Err` if the stream is malformed (e.g. duplicate type_id with
242    /// a different schema).
243    pub fn next_frame(&mut self) -> Result<Option<DecodedFrame>, DecodeError> {
244        if self.pos >= self.data.len() {
245            return Ok(None);
246        }
247        if self.try_consume_reset_header() {
248            return self.next_frame();
249        }
250        let remaining = &self.data[self.pos..];
251        let base = self.timestamp_base_ns;
252        let (frame, consumed) =
253            match codec::decode_frame(remaining, |type_id| self.schema_info(type_id), base) {
254                Some(r) => r,
255                None => return Ok(None),
256            };
257        self.pos += consumed;
258        match frame {
259            Frame::Schema { type_id, entry } => {
260                let result = DecodedFrame::Schema(entry.clone());
261                self.register_schema(type_id, entry)
262                    .map_err(|msg| DecodeError {
263                        pos: self.pos,
264                        message: msg,
265                    })?;
266                Ok(Some(result))
267            }
268            Frame::Event {
269                type_id,
270                timestamp_ns,
271                values,
272            } => {
273                if let Some(ts) = timestamp_ns {
274                    self.timestamp_base_ns = ts;
275                }
276                Ok(Some(DecodedFrame::Event {
277                    type_id,
278                    timestamp_ns,
279                    values,
280                }))
281            }
282            Frame::StringPool(entries) => {
283                for e in &entries {
284                    if let Ok(s) = String::from_utf8(e.data.clone()) {
285                        self.string_pool.insert(InternedString(e.pool_id), s);
286                    }
287                }
288                Ok(Some(DecodedFrame::StringPool(entries)))
289            }
290            Frame::TimestampReset(ts) => {
291                self.timestamp_base_ns = ts;
292                self.next_frame() // consume silently, return next real frame
293            }
294        }
295    }
296
297    /// Collect all remaining frames. Stops on error or end of stream.
298    pub fn decode_all(&mut self) -> Vec<DecodedFrame> {
299        let mut frames = Vec::new();
300        while let Ok(Some(f)) = self.next_frame() {
301            frames.push(f);
302        }
303        frames
304    }
305
306    /// Decode the next frame without copying field data. Returns `Ok(None)` when
307    /// stream is exhausted. Returns `Err` on malformed data.
308    pub fn next_frame_ref(&mut self) -> Result<Option<DecodedFrameRef<'a>>, DecodeError> {
309        if self.pos >= self.data.len() {
310            return Ok(None);
311        }
312        if self.try_consume_reset_header() {
313            return self.next_frame_ref();
314        }
315        let remaining = &self.data[self.pos..];
316        let base = self.timestamp_base_ns;
317        let (frame, consumed) =
318            match codec::decode_frame_ref(remaining, |type_id| self.schema_info(type_id), base) {
319                Some(r) => r,
320                None => return Ok(None),
321            };
322        self.pos += consumed;
323        match frame {
324            FrameRef::Schema { type_id, entry } => {
325                let result = DecodedFrameRef::Schema(entry.clone());
326                self.register_schema(type_id, entry)
327                    .map_err(|msg| DecodeError {
328                        pos: self.pos,
329                        message: msg,
330                    })?;
331                Ok(Some(result))
332            }
333            FrameRef::Event {
334                type_id,
335                timestamp_ns,
336                values,
337            } => {
338                if let Some(ts) = timestamp_ns {
339                    self.timestamp_base_ns = ts;
340                }
341                Ok(Some(DecodedFrameRef::Event {
342                    type_id,
343                    timestamp_ns,
344                    values,
345                }))
346            }
347            FrameRef::StringPool(entries) => {
348                for e in &entries {
349                    if let Ok(s) = std::str::from_utf8(e.data) {
350                        self.string_pool
351                            .insert(InternedString(e.pool_id), s.to_string());
352                    }
353                }
354                Ok(Some(DecodedFrameRef::StringPool(entries)))
355            }
356            FrameRef::TimestampReset(ts) => {
357                self.timestamp_base_ns = ts;
358                self.next_frame_ref()
359            }
360        }
361    }
362
363    /// Collect all remaining frames using zero-copy decoding. Stops on error or end of stream.
364    pub fn decode_all_ref(&mut self) -> Vec<DecodedFrameRef<'a>> {
365        let mut frames = Vec::new();
366        while let Ok(Some(f)) = self.next_frame_ref() {
367            frames.push(f);
368        }
369        frames
370    }
371
372    /// Process all events with a callback, avoiding per-event Vec allocations.
373    /// Schemas and string pools are registered automatically.
374    ///
375    /// The [`RawEvent`] passed to the callback borrows from the decoder's input
376    /// buffer. The `fields` slice is reused across calls, so values cannot be
377    /// stored across iterations without copying.
378    ///
379    /// Returns `Err` if the stream is malformed.
380    pub fn for_each_event(
381        &mut self,
382        mut f: impl for<'f> FnMut(RawEvent<'a, 'f>),
383    ) -> Result<(), DecodeError> {
384        self.try_for_each_event(|ev| {
385            f(ev);
386            Ok::<(), std::convert::Infallible>(())
387        })
388        .map_err(|e| match e {
389            TryForEachError::Decode(d) => d,
390            TryForEachError::User(inf) => match inf {},
391        })
392    }
393
394    /// Like [`for_each_event`](Self::for_each_event), but the callback may
395    /// return an error to stop iteration early.
396    pub fn try_for_each_event<E>(
397        &mut self,
398        mut f: impl for<'f> FnMut(RawEvent<'a, 'f>) -> Result<(), E>,
399    ) -> Result<(), TryForEachError<E>> {
400        let mut values_buf: Vec<FieldValueRef<'a>> = Vec::new();
401        while self.pos < self.data.len() {
402            let remaining = &self.data[self.pos..];
403            let tag = match remaining.first() {
404                Some(t) => *t,
405                None => break,
406            };
407            match tag {
408                codec::TAG_EVENT => {
409                    let mut pos = 1;
410                    let type_id = match remaining.get(pos..pos + 2) {
411                        Some(b) => {
412                            pos += 2;
413                            WireTypeId(u16::from_le_bytes(b.try_into().unwrap()))
414                        }
415                        None => {
416                            return Err(TryForEachError::Decode(DecodeError {
417                                pos: self.pos,
418                                message: "truncated event frame".into(),
419                            }));
420                        }
421                    };
422                    let cache = match self
423                        .schema_cache
424                        .get(type_id.0 as usize)
425                        .and_then(|s| s.as_ref())
426                    {
427                        Some(c) => c,
428                        None => {
429                            return Err(TryForEachError::Decode(DecodeError {
430                                pos: self.pos,
431                                message: format!("unknown type_id {type_id:?}"),
432                            }));
433                        }
434                    };
435
436                    let timestamp_ns = if cache.entry.has_timestamp {
437                        match codec::decode_u24_le(&remaining[pos..]) {
438                            Some(delta) => {
439                                pos += 3;
440                                Some(self.timestamp_base_ns + delta as u64)
441                            }
442                            None => {
443                                return Err(TryForEachError::Decode(DecodeError {
444                                    pos: self.pos + pos,
445                                    message: "truncated timestamp delta".into(),
446                                }));
447                            }
448                        }
449                    } else {
450                        None
451                    };
452
453                    values_buf.clear();
454                    for &ftag in &cache.field_tags {
455                        let inner_type = match FieldType::from_tag(ftag) {
456                            Some(ft) => ft,
457                            None => {
458                                return Err(TryForEachError::Decode(DecodeError {
459                                    pos: self.pos + pos,
460                                    message: format!("unknown field type tag {ftag:#x}"),
461                                }));
462                            }
463                        };
464                        if inner_type.is_optional() {
465                            match remaining.get(pos) {
466                                Some(0x00) => {
467                                    values_buf.push(FieldValueRef::None);
468                                    pos += 1;
469                                }
470                                Some(_) => {
471                                    pos += 1;
472                                    match FieldValueRef::decode(inner_type.inner(), remaining, pos)
473                                    {
474                                        Some((val, consumed)) => {
475                                            values_buf.push(val);
476                                            pos += consumed;
477                                        }
478                                        None => {
479                                            return Err(TryForEachError::Decode(DecodeError {
480                                                pos: self.pos + pos,
481                                                message: "truncated optional field value".into(),
482                                            }));
483                                        }
484                                    }
485                                }
486                                None => {
487                                    return Err(TryForEachError::Decode(DecodeError {
488                                        pos: self.pos + pos,
489                                        message: "truncated optional field prefix".into(),
490                                    }));
491                                }
492                            }
493                        } else {
494                            match FieldValueRef::decode(inner_type, remaining, pos) {
495                                Some((val, consumed)) => {
496                                    values_buf.push(val);
497                                    pos += consumed;
498                                }
499                                None => {
500                                    return Err(TryForEachError::Decode(DecodeError {
501                                        pos: self.pos + pos,
502                                        message: "truncated field value".into(),
503                                    }));
504                                }
505                            }
506                        }
507                    }
508                    // Update mutable state. The borrow checker allows this
509                    // because `cache` borrows `self.schema_cache` while we
510                    // mutate `self.pos` and `self.timestamp_base_ns`, which
511                    // are disjoint fields. We use a block with destructured
512                    // refs to make this explicit.
513                    {
514                        let Self {
515                            pos: self_pos,
516                            timestamp_base_ns,
517                            ..
518                        } = self;
519                        *self_pos += pos;
520                        if let Some(ts) = timestamp_ns {
521                            *timestamp_base_ns = ts;
522                        }
523                    }
524                    f(RawEvent {
525                        type_id,
526                        name: &cache.entry.name,
527                        timestamp_ns,
528                        fields: &values_buf,
529                        schema: &cache.entry,
530                        string_pool: &self.string_pool,
531                    })
532                    .map_err(TryForEachError::User)?;
533                }
534                codec::TAG_TIMESTAMP_RESET => {
535                    let ts = match self.data.get(self.pos + 1..self.pos + 9) {
536                        Some(b) => u64::from_le_bytes(b.try_into().unwrap()),
537                        None => {
538                            return Err(TryForEachError::Decode(DecodeError {
539                                pos: self.pos,
540                                message: "truncated timestamp reset".into(),
541                            }));
542                        }
543                    };
544                    self.timestamp_base_ns = ts;
545                    self.pos += 9;
546                }
547                _ => {
548                    // Mid-stream header = reset frame (tag 0x54 = 'T' from TRC\0)
549                    if tag == codec::MAGIC[0] && self.try_consume_reset_header() {
550                        continue;
551                    }
552                    match self.next_frame_ref() {
553                        Ok(Some(_)) => {}
554                        Ok(None) => {
555                            return Err(TryForEachError::Decode(DecodeError {
556                                pos: self.pos,
557                                message: format!("failed to decode frame with tag 0x{tag:02x}"),
558                            }));
559                        }
560                        Err(e) => return Err(TryForEachError::Decode(e)),
561                    }
562                }
563            }
564        }
565        Ok(())
566    }
567
568    /// Returns an iterator that yields only [`DecodedFrameRef::Event`] variants,
569    /// silently consuming schema, string-pool, and symbol-table frames
570    /// (while still updating internal decoder state).
571    pub fn events(&mut self) -> EventIter<'_, 'a> {
572        EventIter { decoder: self }
573    }
574}
575
576impl<'a> Iterator for Decoder<'a> {
577    type Item = Result<DecodedFrameRef<'a>, DecodeError>;
578
579    fn next(&mut self) -> Option<Self::Item> {
580        self.next_frame_ref().transpose()
581    }
582}
583
584/// Iterator that yields only [`DecodedFrameRef::Event`] frames,
585/// consuming non-event frames to keep decoder state up to date.
586pub struct EventIter<'d, 'a> {
587    decoder: &'d mut Decoder<'a>,
588}
589
590impl<'d, 'a> Iterator for EventIter<'d, 'a> {
591    type Item = Result<DecodedFrameRef<'a>, DecodeError>;
592
593    fn next(&mut self) -> Option<Self::Item> {
594        loop {
595            match self.decoder.next()? {
596                Ok(frame @ DecodedFrameRef::Event { .. }) => return Some(Ok(frame)),
597                Ok(_) => continue, // schema, string pool, symbol table — skip
598                Err(e) => return Some(Err(e)),
599            }
600        }
601    }
602}
603
604#[cfg(test)]
605mod tests {
606    use super::*;
607    use crate::encoder::Encoder;
608    use crate::schema::FieldDef;
609    use crate::types::{FieldType, FieldValue};
610
611    #[test]
612    fn decode_empty_stream() {
613        let enc = Encoder::new();
614        let data = enc.finish();
615        let mut dec = Decoder::new(&data).unwrap();
616        assert_eq!(dec.version(), 1);
617        assert!(dec.next_frame().unwrap().is_none());
618    }
619
620    #[test]
621    fn decode_schema_frame() {
622        let mut enc = Encoder::new();
623        enc.register_schema(
624            "Ev",
625            vec![FieldDef {
626                name: "v".into(),
627                field_type: FieldType::Varint,
628            }],
629        )
630        .unwrap();
631        let data = enc.finish();
632        let mut dec = Decoder::new(&data).unwrap();
633        let frame = dec.next_frame().unwrap().unwrap();
634        assert!(matches!(frame, DecodedFrame::Schema(s) if s.name == "Ev"));
635    }
636
637    #[test]
638    fn decode_event_after_schema() {
639        let mut enc = Encoder::new();
640        let schema = enc
641            .register_schema(
642                "Ev",
643                vec![FieldDef {
644                    name: "v".into(),
645                    field_type: FieldType::Varint,
646                }],
647            )
648            .unwrap();
649        enc.write_event(
650            &schema,
651            &[FieldValue::Varint(1_000), FieldValue::Varint(42)],
652        )
653        .unwrap();
654        let data = enc.finish();
655
656        let mut dec = Decoder::new(&data).unwrap();
657        let frames = dec.decode_all();
658        assert_eq!(frames.len(), 2);
659        if let DecodedFrame::Event { values, .. } = &frames[1] {
660            assert_eq!(*values, vec![FieldValue::Varint(42)]);
661        } else {
662            panic!("expected event");
663        }
664    }
665
666    #[test]
667    fn decode_string_pool_builds_map() {
668        let mut enc = Encoder::new();
669        let id = enc.intern_string("hello").unwrap();
670        let data = enc.finish();
671
672        let mut dec = Decoder::new(&data).unwrap();
673        dec.decode_all();
674        assert_eq!(dec.string_pool().get(id), Some("hello"));
675    }
676
677    #[test]
678    fn decode_multiple_events() {
679        let mut enc = Encoder::new();
680        let schema = enc
681            .register_schema(
682                "Ev",
683                vec![FieldDef {
684                    name: "v".into(),
685                    field_type: FieldType::Varint,
686                }],
687            )
688            .unwrap();
689        for i in 0..10u64 {
690            enc.write_event(
691                &schema,
692                &[FieldValue::Varint(i * 1000), FieldValue::Varint(i)],
693            )
694            .unwrap();
695        }
696        let data = enc.finish();
697
698        let mut dec = Decoder::new(&data).unwrap();
699        let frames = dec.decode_all();
700        assert_eq!(frames.len(), 11);
701    }
702
703    #[test]
704    fn bad_header_returns_none() {
705        assert!(Decoder::new(&[0x00, 0x00, 0x00, 0x00, 1]).is_none());
706    }
707
708    #[test]
709    fn iterator_yields_all_frames() {
710        let mut enc = Encoder::new();
711        let schema = enc
712            .register_schema(
713                "Ev",
714                vec![FieldDef {
715                    name: "v".into(),
716                    field_type: FieldType::Varint,
717                }],
718            )
719            .unwrap();
720        for i in 0..3u64 {
721            enc.write_event(
722                &schema,
723                &[FieldValue::Varint(i * 1000), FieldValue::Varint(i)],
724            )
725            .unwrap();
726        }
727        let data = enc.finish();
728
729        let dec = Decoder::new(&data).unwrap();
730        let frames: Vec<_> = dec.collect::<Result<Vec<_>, _>>().unwrap();
731        // 1 schema + 3 events
732        assert_eq!(frames.len(), 4);
733        assert!(matches!(frames[0], DecodedFrameRef::Schema(_)));
734        assert!(matches!(frames[1], DecodedFrameRef::Event { .. }));
735    }
736
737    #[test]
738    fn iterator_early_termination() {
739        let mut enc = Encoder::new();
740        let schema = enc
741            .register_schema(
742                "Ev",
743                vec![FieldDef {
744                    name: "v".into(),
745                    field_type: FieldType::Varint,
746                }],
747            )
748            .unwrap();
749        for i in 0..10u64 {
750            enc.write_event(
751                &schema,
752                &[FieldValue::Varint(i * 1000), FieldValue::Varint(i)],
753            )
754            .unwrap();
755        }
756        let data = enc.finish();
757
758        let mut dec = Decoder::new(&data).unwrap();
759        // Take just 2 frames (schema + first event), don't decode the rest
760        let first_two: Vec<_> = dec.by_ref().take(2).collect::<Result<Vec<_>, _>>().unwrap();
761        assert_eq!(first_two.len(), 2);
762        // Decoder should still have remaining data
763        let next = dec.next();
764        assert!(next.is_some());
765    }
766
767    #[test]
768    fn events_iterator_skips_schema() {
769        let mut enc = Encoder::new();
770        let schema = enc
771            .register_schema(
772                "Ev",
773                vec![FieldDef {
774                    name: "v".into(),
775                    field_type: FieldType::Varint,
776                }],
777            )
778            .unwrap();
779        enc.write_event(
780            &schema,
781            &[FieldValue::Varint(1_000), FieldValue::Varint(42)],
782        )
783        .unwrap();
784        enc.write_event(
785            &schema,
786            &[FieldValue::Varint(2_000), FieldValue::Varint(99)],
787        )
788        .unwrap();
789        let data = enc.finish();
790
791        let mut dec = Decoder::new(&data).unwrap();
792        let events: Vec<_> = dec.events().collect::<Result<Vec<_>, _>>().unwrap();
793        // Only events, no schema frame
794        assert_eq!(events.len(), 2);
795        for ev in &events {
796            assert!(matches!(ev, DecodedFrameRef::Event { .. }));
797        }
798    }
799
800    #[test]
801    fn events_iterator_first_event_only() {
802        let mut enc = Encoder::new();
803        let schema = enc
804            .register_schema(
805                "Ev",
806                vec![FieldDef {
807                    name: "v".into(),
808                    field_type: FieldType::Varint,
809                }],
810            )
811            .unwrap();
812        for i in 0..5u64 {
813            enc.write_event(
814                &schema,
815                &[FieldValue::Varint(i * 1000), FieldValue::Varint(i)],
816            )
817            .unwrap();
818        }
819        let data = enc.finish();
820
821        let mut dec = Decoder::new(&data).unwrap();
822        // Get just the first event — schema is consumed internally
823        let first = dec.events().next().unwrap().unwrap();
824        assert!(matches!(first, DecodedFrameRef::Event { .. }));
825    }
826}