Skip to main content

dial9_trace_format/
decoder.rs

1//! Streaming decoder for reading trace files.
2//!
3//! [`Decoder`] reads the file header, processes schema and string-pool frames,
4//! and yields events as [`DecodedFrame`] (owned) or [`DecodedFrameRef`]
5//! (zero-copy). It also implements [`Iterator`] and provides a
6//! [`for_each_event`](Decoder::for_each_event) callback API for
7//! allocation-free processing.
8
9use crate::codec::{
10    self, Frame, FrameRef, HEADER_SIZE, PoolEntry, PoolEntryRef, SchemaInfo, StackPoolEntry,
11    StackPoolEntryRef, WireTypeId,
12};
13use crate::schema::{SchemaEntry, SchemaRegistry};
14use crate::types::{FieldType, FieldValueRef, InternedStackFrames, InternedString, StackFrames};
15use std::collections::HashMap;
16use std::fmt;
17
18/// Error returned when the decoder cannot continue reading the stream.
19/// Because frames are not length-prefixed, a decode error is unrecoverable —
20/// the decoder cannot skip the malformed frame to find the next one.
21#[derive(Debug, Clone)]
22pub struct DecodeError {
23    pub pos: usize,
24    pub message: String,
25}
26
27impl fmt::Display for DecodeError {
28    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
29        write!(f, "decode error at byte {}: {}", self.pos, self.message)
30    }
31}
32
33impl std::error::Error for DecodeError {}
34
35/// Error returned by [`Decoder::try_for_each_event`].
36#[derive(Debug)]
37pub enum TryForEachError<E> {
38    Decode(DecodeError),
39    User(E),
40}
41
42impl<E: fmt::Display> fmt::Display for TryForEachError<E> {
43    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
44        match self {
45            TryForEachError::Decode(e) => write!(f, "{e}"),
46            TryForEachError::User(e) => write!(f, "{e}"),
47        }
48    }
49}
50
51impl<E: fmt::Display + fmt::Debug> std::error::Error for TryForEachError<E> {}
52
53/// A decoded event passed to [`Decoder::for_each_event`].
54///
55/// `'a` is the lifetime of the input data buffer (strings, stack frames borrow from it).
56/// `'f` is the lifetime of the `fields` slice and schema name (reused across calls).
57#[non_exhaustive]
58pub struct RawEvent<'a, 'f> {
59    pub type_id: WireTypeId,
60    pub name: &'f str,
61    pub timestamp_ns: Option<u64>,
62    pub fields: &'f [FieldValueRef<'a>],
63    pub schema: &'f SchemaEntry,
64    pub string_pool: &'f StringPool,
65    pub stack_pool: &'f StackPool,
66}
67
68impl<'a, 'f> RawEvent<'a, 'f> {
69    /// Field names from the schema, parallel to `fields`.
70    pub fn field_names(&self) -> impl Iterator<Item = &'f str> {
71        self.schema.fields.iter().map(|f| f.name.as_str())
72    }
73}
74
75/// A map from interned string IDs to their resolved string values.
76///
77/// Populated automatically by the [`Decoder`] as it processes `StringPool` frames.
78/// Pass a reference to [`crate::TraceEvent::decode`] so that `InternedString` fields
79/// resolve to `&str` in derived `Ref` types.
80#[derive(Debug, Default)]
81pub struct StringPool(pub(crate) HashMap<InternedString, String>);
82
83impl StringPool {
84    pub(crate) fn new() -> Self {
85        Self(HashMap::default())
86    }
87
88    pub(crate) fn insert(&mut self, id: InternedString, value: String) {
89        self.0.insert(id, value);
90    }
91
92    pub fn get(&self, id: InternedString) -> Option<&str> {
93        self.0.get(&id).map(|s| s.as_str())
94    }
95
96    pub fn len(&self) -> usize {
97        self.0.len()
98    }
99
100    pub fn is_empty(&self) -> bool {
101        self.0.is_empty()
102    }
103
104    /// Iterate over all interned strings as `(id, value)` pairs.
105    pub fn iter(&self) -> impl Iterator<Item = (InternedString, &str)> {
106        self.0.iter().map(|(&id, v)| (id, v.as_str()))
107    }
108}
109
110/// A map from interned stack-frame IDs to their resolved address vectors.
111///
112/// Populated automatically by the [`Decoder`] as it processes `StackPool` frames.
113#[derive(Debug, Default)]
114pub struct StackPool(pub(crate) HashMap<InternedStackFrames, Vec<u64>>);
115
116impl StackPool {
117    pub(crate) fn new() -> Self {
118        Self(HashMap::default())
119    }
120
121    pub(crate) fn insert(&mut self, id: InternedStackFrames, frames: StackFrames) {
122        self.0.insert(id, frames.0);
123    }
124
125    pub fn get(&self, id: InternedStackFrames) -> Option<&[u64]> {
126        self.0.get(&id).map(|v| v.as_slice())
127    }
128
129    pub fn len(&self) -> usize {
130        self.0.len()
131    }
132
133    pub fn is_empty(&self) -> bool {
134        self.0.is_empty()
135    }
136
137    /// Iterate over all interned stack frames as `(id, frames)` pairs.
138    pub fn iter(&self) -> impl Iterator<Item = (InternedStackFrames, &[u64])> {
139        self.0.iter().map(|(&id, v)| (id, v.as_slice()))
140    }
141}
142
143/// Decoded events yielded by the decoder.
144#[derive(Debug, Clone, PartialEq)]
145pub enum DecodedFrame {
146    Schema(SchemaEntry),
147    Event {
148        type_id: WireTypeId,
149        /// Absolute timestamp in nanoseconds, if the schema has `has_timestamp`.
150        timestamp_ns: Option<u64>,
151        values: Vec<crate::types::FieldValue>,
152    },
153    StringPool(Vec<PoolEntry>),
154    StackPool(Vec<StackPoolEntry>),
155    SchemaAnnotations {
156        type_id: WireTypeId,
157        annotations: Vec<crate::schema::FieldAnnotation>,
158    },
159}
160
161/// Zero-copy decoded frame that borrows from the input buffer.
162#[derive(Debug, Clone, PartialEq)]
163pub enum DecodedFrameRef<'a> {
164    Schema(SchemaEntry),
165    Event {
166        type_id: WireTypeId,
167        timestamp_ns: Option<u64>,
168        values: Vec<FieldValueRef<'a>>,
169    },
170    StringPool(Vec<PoolEntryRef<'a>>),
171    StackPool(Vec<StackPoolEntryRef<'a>>),
172    SchemaAnnotations {
173        type_id: WireTypeId,
174        annotations: Vec<crate::schema::FieldAnnotation>,
175    },
176}
177
178struct SchemaCache {
179    entry: SchemaEntry,
180    /// Raw field type tags for fast decode (avoids re-extracting from entry.fields).
181    field_tags: Vec<u8>,
182}
183
184/// Streaming trace file decoder.
185///
186/// Reads from a byte slice, processing schema, string-pool, and event frames.
187/// Implements [`Iterator`] over [`DecodedFrameRef`] for convenient consumption.
188pub struct Decoder<'a> {
189    data: &'a [u8],
190    pos: usize,
191    registry: SchemaRegistry,
192    schema_cache: Vec<Option<SchemaCache>>,
193    string_pool: StringPool,
194    stack_pool: StackPool,
195    version: u8,
196    timestamp_base_ns: u64,
197}
198
199impl<'a> Decoder<'a> {
200    pub fn new(data: &'a [u8]) -> Option<Self> {
201        let version = codec::decode_header(data)?;
202        Some(Self {
203            data,
204            pos: HEADER_SIZE,
205            registry: SchemaRegistry::new(),
206            schema_cache: Vec::new(),
207            string_pool: StringPool::new(),
208            stack_pool: StackPool::new(),
209            version,
210            timestamp_base_ns: 0,
211        })
212    }
213
214    pub fn registry(&self) -> &SchemaRegistry {
215        &self.registry
216    }
217
218    pub fn version(&self) -> u8 {
219        self.version
220    }
221
222    pub fn string_pool(&self) -> &StringPool {
223        &self.string_pool
224    }
225
226    pub fn stack_pool(&self) -> &StackPool {
227        &self.stack_pool
228    }
229
230    /// Reset decoder state (schemas, string pool, timestamp base) as if
231    /// starting a fresh stream. Used when a mid-stream header is encountered
232    /// (the "reset frame" pattern for concatenated thread-local batches).
233    fn reset_state(&mut self) {
234        self.registry = SchemaRegistry::new();
235        self.schema_cache.clear();
236        self.string_pool = StringPool::new();
237        self.stack_pool = StackPool::new();
238        self.timestamp_base_ns = 0;
239    }
240
241    /// If the current position starts with a valid header, reset state and
242    /// skip past it, returning true.
243    fn try_consume_reset_header(&mut self) -> bool {
244        if self.pos + HEADER_SIZE <= self.data.len()
245            && codec::decode_header(&self.data[self.pos..]).is_some()
246        {
247            self.reset_state();
248            self.pos += HEADER_SIZE;
249            true
250        } else {
251            false
252        }
253    }
254
255    /// Consume this decoder and create an [`Encoder`](crate::encoder::Encoder) that appends to the
256    /// decoded trace. The encoder inherits the string pool, schema registry,
257    /// and timestamp base so new frames are compatible with the existing data.
258    ///
259    /// No file header is written — the caller is responsible for concatenating
260    /// the encoder's output after the original trace bytes.
261    pub fn into_encoder<W: std::io::Write>(self, writer: W) -> crate::encoder::Encoder<W> {
262        crate::encoder::Encoder::from_decoder(
263            self.registry,
264            self.string_pool,
265            self.stack_pool,
266            self.timestamp_base_ns,
267            writer,
268        )
269    }
270
271    pub(crate) fn schema_info(&self, type_id: WireTypeId) -> Option<SchemaInfo<'_>> {
272        self.schema_cache
273            .get(type_id.0 as usize)
274            .and_then(|s| s.as_ref())
275            .map(|c| SchemaInfo {
276                field_tags: &c.field_tags,
277                has_timestamp: c.entry.has_timestamp,
278            })
279    }
280
281    fn register_schema(&mut self, type_id: WireTypeId, entry: SchemaEntry) -> Result<(), String> {
282        let idx = type_id.0 as usize;
283        if idx >= self.schema_cache.len() {
284            self.schema_cache.resize_with(idx + 1, || None);
285        }
286        self.schema_cache[idx] = Some(SchemaCache {
287            field_tags: entry.fields.iter().map(|f| f.field_type as u8).collect(),
288            entry: entry.clone(),
289        });
290        self.registry.register(type_id, entry)
291    }
292
293    /// Decode the next frame. Returns `Ok(None)` when stream is exhausted.
294    /// Returns `Err` if the stream is malformed (e.g. duplicate type_id with
295    /// a different schema).
296    pub fn next_frame(&mut self) -> Result<Option<DecodedFrame>, DecodeError> {
297        if self.pos >= self.data.len() {
298            return Ok(None);
299        }
300        if self.try_consume_reset_header() {
301            return self.next_frame();
302        }
303        let remaining = &self.data[self.pos..];
304        let base = self.timestamp_base_ns;
305        let (frame, consumed) =
306            match codec::decode_frame(remaining, |type_id| self.schema_info(type_id), base) {
307                Some(r) => r,
308                None => return Ok(None),
309            };
310        self.pos += consumed;
311        match frame {
312            Frame::Schema { type_id, entry } => {
313                let result = DecodedFrame::Schema(entry.clone());
314                self.register_schema(type_id, entry)
315                    .map_err(|msg| DecodeError {
316                        pos: self.pos,
317                        message: msg,
318                    })?;
319                Ok(Some(result))
320            }
321            Frame::Event {
322                type_id,
323                timestamp_ns,
324                values,
325            } => {
326                if let Some(ts) = timestamp_ns {
327                    self.timestamp_base_ns = ts;
328                }
329                Ok(Some(DecodedFrame::Event {
330                    type_id,
331                    timestamp_ns,
332                    values,
333                }))
334            }
335            Frame::StringPool(entries) => {
336                for e in &entries {
337                    if let Ok(s) = String::from_utf8(e.data.clone()) {
338                        self.string_pool.insert(InternedString(e.pool_id), s);
339                    }
340                }
341                Ok(Some(DecodedFrame::StringPool(entries)))
342            }
343            Frame::StackPool(entries) => {
344                for e in &entries {
345                    self.stack_pool
346                        .insert(InternedStackFrames(e.pool_id), e.frames.clone().into());
347                }
348                Ok(Some(DecodedFrame::StackPool(entries)))
349            }
350            Frame::TimestampReset(ts) => {
351                self.timestamp_base_ns = ts;
352                self.next_frame() // consume silently, return next real frame
353            }
354            Frame::SchemaAnnotations {
355                type_id,
356                annotations,
357            } => {
358                // Merge annotations into the cached schema (lenient: skip if unknown type_id)
359                if let Some(cache) = self
360                    .schema_cache
361                    .get_mut(type_id.0 as usize)
362                    .and_then(|s| s.as_mut())
363                {
364                    cache.entry.annotations.extend_from_slice(&annotations);
365                }
366                if let Some(entry) = self.registry.schemas.get_mut(&type_id) {
367                    entry.annotations.extend_from_slice(&annotations);
368                }
369                Ok(Some(DecodedFrame::SchemaAnnotations {
370                    type_id,
371                    annotations,
372                }))
373            }
374        }
375    }
376
377    /// Collect all remaining frames. Stops on error or end of stream.
378    pub fn decode_all(&mut self) -> Vec<DecodedFrame> {
379        let mut frames = Vec::new();
380        while let Ok(Some(f)) = self.next_frame() {
381            frames.push(f);
382        }
383        frames
384    }
385
386    /// Decode the next frame without copying field data. Returns `Ok(None)` when
387    /// stream is exhausted. Returns `Err` on malformed data.
388    pub fn next_frame_ref(&mut self) -> Result<Option<DecodedFrameRef<'a>>, DecodeError> {
389        if self.pos >= self.data.len() {
390            return Ok(None);
391        }
392        if self.try_consume_reset_header() {
393            return self.next_frame_ref();
394        }
395        let remaining = &self.data[self.pos..];
396        let base = self.timestamp_base_ns;
397        let (frame, consumed) =
398            match codec::decode_frame_ref(remaining, |type_id| self.schema_info(type_id), base) {
399                Some(r) => r,
400                None => return Ok(None),
401            };
402        self.pos += consumed;
403        match frame {
404            FrameRef::Schema { type_id, entry } => {
405                let result = DecodedFrameRef::Schema(entry.clone());
406                self.register_schema(type_id, entry)
407                    .map_err(|msg| DecodeError {
408                        pos: self.pos,
409                        message: msg,
410                    })?;
411                Ok(Some(result))
412            }
413            FrameRef::Event {
414                type_id,
415                timestamp_ns,
416                values,
417            } => {
418                if let Some(ts) = timestamp_ns {
419                    self.timestamp_base_ns = ts;
420                }
421                Ok(Some(DecodedFrameRef::Event {
422                    type_id,
423                    timestamp_ns,
424                    values,
425                }))
426            }
427            FrameRef::StringPool(entries) => {
428                for e in &entries {
429                    if let Ok(s) = std::str::from_utf8(e.data) {
430                        self.string_pool
431                            .insert(InternedString(e.pool_id), s.to_string());
432                    }
433                }
434                Ok(Some(DecodedFrameRef::StringPool(entries)))
435            }
436            FrameRef::StackPool(entries) => {
437                for e in &entries {
438                    self.stack_pool
439                        .insert(InternedStackFrames(e.pool_id), e.to_stack_frames());
440                }
441                Ok(Some(DecodedFrameRef::StackPool(entries)))
442            }
443            FrameRef::TimestampReset(ts) => {
444                self.timestamp_base_ns = ts;
445                self.next_frame_ref()
446            }
447            FrameRef::SchemaAnnotations {
448                type_id,
449                annotations,
450            } => {
451                if let Some(cache) = self
452                    .schema_cache
453                    .get_mut(type_id.0 as usize)
454                    .and_then(|s| s.as_mut())
455                {
456                    cache.entry.annotations.extend_from_slice(&annotations);
457                }
458                if let Some(entry) = self.registry.schemas.get_mut(&type_id) {
459                    entry.annotations.extend_from_slice(&annotations);
460                }
461                Ok(Some(DecodedFrameRef::SchemaAnnotations {
462                    type_id,
463                    annotations,
464                }))
465            }
466        }
467    }
468
469    /// Collect all remaining frames using zero-copy decoding. Stops on error or end of stream.
470    pub fn decode_all_ref(&mut self) -> Vec<DecodedFrameRef<'a>> {
471        let mut frames = Vec::new();
472        while let Ok(Some(f)) = self.next_frame_ref() {
473            frames.push(f);
474        }
475        frames
476    }
477
478    /// Process all events with a callback, avoiding per-event Vec allocations.
479    /// Schemas and string pools are registered automatically.
480    ///
481    /// The [`RawEvent`] passed to the callback borrows from the decoder's input
482    /// buffer. The `fields` slice is reused across calls, so values cannot be
483    /// stored across iterations without copying.
484    ///
485    /// Returns `Err` if the stream is malformed.
486    pub fn for_each_event(
487        &mut self,
488        mut f: impl for<'f> FnMut(RawEvent<'a, 'f>),
489    ) -> Result<(), DecodeError> {
490        self.try_for_each_event(|ev| {
491            f(ev);
492            Ok::<(), std::convert::Infallible>(())
493        })
494        .map_err(|e| match e {
495            TryForEachError::Decode(d) => d,
496            TryForEachError::User(inf) => match inf {},
497        })
498    }
499
500    /// Like [`for_each_event`](Self::for_each_event), but the callback may
501    /// return an error to stop iteration early.
502    pub fn try_for_each_event<E>(
503        &mut self,
504        mut f: impl for<'f> FnMut(RawEvent<'a, 'f>) -> Result<(), E>,
505    ) -> Result<(), TryForEachError<E>> {
506        let mut values_buf: Vec<FieldValueRef<'a>> = Vec::new();
507        while self.pos < self.data.len() {
508            let remaining = &self.data[self.pos..];
509            let tag = match remaining.first() {
510                Some(t) => *t,
511                None => break,
512            };
513            match tag {
514                codec::TAG_EVENT => {
515                    let mut pos = 1;
516                    let type_id = match remaining.get(pos..pos + 2) {
517                        Some(b) => {
518                            pos += 2;
519                            WireTypeId(u16::from_le_bytes(b.try_into().unwrap()))
520                        }
521                        None => {
522                            return Err(TryForEachError::Decode(DecodeError {
523                                pos: self.pos,
524                                message: "truncated event frame".into(),
525                            }));
526                        }
527                    };
528                    let cache = match self
529                        .schema_cache
530                        .get(type_id.0 as usize)
531                        .and_then(|s| s.as_ref())
532                    {
533                        Some(c) => c,
534                        None => {
535                            return Err(TryForEachError::Decode(DecodeError {
536                                pos: self.pos,
537                                message: format!("unknown type_id {type_id:?}"),
538                            }));
539                        }
540                    };
541
542                    let timestamp_ns = if cache.entry.has_timestamp {
543                        match codec::decode_u24_le(&remaining[pos..]) {
544                            Some(delta) => {
545                                pos += 3;
546                                Some(self.timestamp_base_ns + delta as u64)
547                            }
548                            None => {
549                                return Err(TryForEachError::Decode(DecodeError {
550                                    pos: self.pos + pos,
551                                    message: "truncated timestamp delta".into(),
552                                }));
553                            }
554                        }
555                    } else {
556                        None
557                    };
558
559                    values_buf.clear();
560                    for &ftag in &cache.field_tags {
561                        let inner_type = match FieldType::from_tag(ftag) {
562                            Some(ft) => ft,
563                            None => {
564                                return Err(TryForEachError::Decode(DecodeError {
565                                    pos: self.pos + pos,
566                                    message: format!("unknown field type tag {ftag:#x}"),
567                                }));
568                            }
569                        };
570                        if inner_type.is_optional() {
571                            match remaining.get(pos) {
572                                Some(0x00) => {
573                                    values_buf.push(FieldValueRef::None);
574                                    pos += 1;
575                                }
576                                Some(_) => {
577                                    pos += 1;
578                                    match FieldValueRef::decode(inner_type.inner(), remaining, pos)
579                                    {
580                                        Some((val, consumed)) => {
581                                            values_buf.push(val);
582                                            pos += consumed;
583                                        }
584                                        None => {
585                                            return Err(TryForEachError::Decode(DecodeError {
586                                                pos: self.pos + pos,
587                                                message: "truncated optional field value".into(),
588                                            }));
589                                        }
590                                    }
591                                }
592                                None => {
593                                    return Err(TryForEachError::Decode(DecodeError {
594                                        pos: self.pos + pos,
595                                        message: "truncated optional field prefix".into(),
596                                    }));
597                                }
598                            }
599                        } else {
600                            match FieldValueRef::decode(inner_type, remaining, pos) {
601                                Some((val, consumed)) => {
602                                    values_buf.push(val);
603                                    pos += consumed;
604                                }
605                                None => {
606                                    return Err(TryForEachError::Decode(DecodeError {
607                                        pos: self.pos + pos,
608                                        message: "truncated field value".into(),
609                                    }));
610                                }
611                            }
612                        }
613                    }
614                    // Update mutable state. The borrow checker allows this
615                    // because `cache` borrows `self.schema_cache` while we
616                    // mutate `self.pos` and `self.timestamp_base_ns`, which
617                    // are disjoint fields. We use a block with destructured
618                    // refs to make this explicit.
619                    {
620                        let Self {
621                            pos: self_pos,
622                            timestamp_base_ns,
623                            ..
624                        } = self;
625                        *self_pos += pos;
626                        if let Some(ts) = timestamp_ns {
627                            *timestamp_base_ns = ts;
628                        }
629                    }
630                    f(RawEvent {
631                        type_id,
632                        name: &cache.entry.name,
633                        timestamp_ns,
634                        fields: &values_buf,
635                        schema: &cache.entry,
636                        string_pool: &self.string_pool,
637                        stack_pool: &self.stack_pool,
638                    })
639                    .map_err(TryForEachError::User)?;
640                }
641                codec::TAG_TIMESTAMP_RESET => {
642                    let ts = match self.data.get(self.pos + 1..self.pos + 9) {
643                        Some(b) => u64::from_le_bytes(b.try_into().unwrap()),
644                        None => {
645                            return Err(TryForEachError::Decode(DecodeError {
646                                pos: self.pos,
647                                message: "truncated timestamp reset".into(),
648                            }));
649                        }
650                    };
651                    self.timestamp_base_ns = ts;
652                    self.pos += 9;
653                }
654                _ => {
655                    // Mid-stream header = reset frame (tag 0x54 = 'T' from TRC\0)
656                    if tag == codec::MAGIC[0] && self.try_consume_reset_header() {
657                        continue;
658                    }
659                    match self.next_frame_ref() {
660                        Ok(Some(_)) => {}
661                        Ok(None) => {
662                            return Err(TryForEachError::Decode(DecodeError {
663                                pos: self.pos,
664                                message: format!("failed to decode frame with tag 0x{tag:02x}"),
665                            }));
666                        }
667                        Err(e) => return Err(TryForEachError::Decode(e)),
668                    }
669                }
670            }
671        }
672        Ok(())
673    }
674
675    /// Returns an iterator that yields only [`DecodedFrameRef::Event`] variants,
676    /// silently consuming schema, string-pool, and symbol-table frames
677    /// (while still updating internal decoder state).
678    pub fn events(&mut self) -> EventIter<'_, 'a> {
679        EventIter { decoder: self }
680    }
681}
682
683impl<'a> Iterator for Decoder<'a> {
684    type Item = Result<DecodedFrameRef<'a>, DecodeError>;
685
686    fn next(&mut self) -> Option<Self::Item> {
687        self.next_frame_ref().transpose()
688    }
689}
690
691/// Iterator that yields only [`DecodedFrameRef::Event`] frames,
692/// consuming non-event frames to keep decoder state up to date.
693pub struct EventIter<'d, 'a> {
694    decoder: &'d mut Decoder<'a>,
695}
696
697impl<'d, 'a> Iterator for EventIter<'d, 'a> {
698    type Item = Result<DecodedFrameRef<'a>, DecodeError>;
699
700    fn next(&mut self) -> Option<Self::Item> {
701        loop {
702            match self.decoder.next()? {
703                Ok(frame @ DecodedFrameRef::Event { .. }) => return Some(Ok(frame)),
704                Ok(_) => continue, // schema, string pool, symbol table — skip
705                Err(e) => return Some(Err(e)),
706            }
707        }
708    }
709}
710
711#[cfg(test)]
712mod tests {
713    use super::*;
714    use crate::encoder::Encoder;
715    use crate::schema::FieldDef;
716    use crate::types::{FieldType, FieldValue};
717
718    #[test]
719    fn decode_empty_stream() {
720        let enc = Encoder::new();
721        let data = enc.finish();
722        let mut dec = Decoder::new(&data).unwrap();
723        assert_eq!(dec.version(), 1);
724        assert!(dec.next_frame().unwrap().is_none());
725    }
726
727    #[test]
728    fn decode_schema_frame() {
729        let mut enc = Encoder::new();
730        enc.register_schema(
731            "Ev",
732            vec![FieldDef {
733                name: "v".into(),
734                field_type: FieldType::Varint,
735            }],
736        )
737        .unwrap();
738        let data = enc.finish();
739        let mut dec = Decoder::new(&data).unwrap();
740        let frame = dec.next_frame().unwrap().unwrap();
741        assert!(matches!(frame, DecodedFrame::Schema(s) if s.name == "Ev"));
742    }
743
744    #[test]
745    fn decode_event_after_schema() {
746        let mut enc = Encoder::new();
747        let schema = enc
748            .register_schema(
749                "Ev",
750                vec![FieldDef {
751                    name: "v".into(),
752                    field_type: FieldType::Varint,
753                }],
754            )
755            .unwrap();
756        enc.write_event(
757            &schema,
758            &[FieldValue::Varint(1_000), FieldValue::Varint(42)],
759        )
760        .unwrap();
761        let data = enc.finish();
762
763        let mut dec = Decoder::new(&data).unwrap();
764        let frames = dec.decode_all();
765        assert_eq!(frames.len(), 2);
766        if let DecodedFrame::Event { values, .. } = &frames[1] {
767            assert_eq!(*values, vec![FieldValue::Varint(42)]);
768        } else {
769            panic!("expected event");
770        }
771    }
772
773    #[test]
774    fn decode_string_pool_builds_map() {
775        let mut enc = Encoder::new();
776        let id = enc.intern_string("hello").unwrap();
777        let data = enc.finish();
778
779        let mut dec = Decoder::new(&data).unwrap();
780        dec.decode_all();
781        assert_eq!(dec.string_pool().get(id), Some("hello"));
782    }
783
784    #[test]
785    fn decode_multiple_events() {
786        let mut enc = Encoder::new();
787        let schema = enc
788            .register_schema(
789                "Ev",
790                vec![FieldDef {
791                    name: "v".into(),
792                    field_type: FieldType::Varint,
793                }],
794            )
795            .unwrap();
796        for i in 0..10u64 {
797            enc.write_event(
798                &schema,
799                &[FieldValue::Varint(i * 1000), FieldValue::Varint(i)],
800            )
801            .unwrap();
802        }
803        let data = enc.finish();
804
805        let mut dec = Decoder::new(&data).unwrap();
806        let frames = dec.decode_all();
807        assert_eq!(frames.len(), 11);
808    }
809
810    #[test]
811    fn bad_header_returns_none() {
812        assert!(Decoder::new(&[0x00, 0x00, 0x00, 0x00, 1]).is_none());
813    }
814
815    #[test]
816    fn iterator_yields_all_frames() {
817        let mut enc = Encoder::new();
818        let schema = enc
819            .register_schema(
820                "Ev",
821                vec![FieldDef {
822                    name: "v".into(),
823                    field_type: FieldType::Varint,
824                }],
825            )
826            .unwrap();
827        for i in 0..3u64 {
828            enc.write_event(
829                &schema,
830                &[FieldValue::Varint(i * 1000), FieldValue::Varint(i)],
831            )
832            .unwrap();
833        }
834        let data = enc.finish();
835
836        let dec = Decoder::new(&data).unwrap();
837        let frames: Vec<_> = dec.collect::<Result<Vec<_>, _>>().unwrap();
838        // 1 schema + 3 events
839        assert_eq!(frames.len(), 4);
840        assert!(matches!(frames[0], DecodedFrameRef::Schema(_)));
841        assert!(matches!(frames[1], DecodedFrameRef::Event { .. }));
842    }
843
844    #[test]
845    fn iterator_early_termination() {
846        let mut enc = Encoder::new();
847        let schema = enc
848            .register_schema(
849                "Ev",
850                vec![FieldDef {
851                    name: "v".into(),
852                    field_type: FieldType::Varint,
853                }],
854            )
855            .unwrap();
856        for i in 0..10u64 {
857            enc.write_event(
858                &schema,
859                &[FieldValue::Varint(i * 1000), FieldValue::Varint(i)],
860            )
861            .unwrap();
862        }
863        let data = enc.finish();
864
865        let mut dec = Decoder::new(&data).unwrap();
866        // Take just 2 frames (schema + first event), don't decode the rest
867        let first_two: Vec<_> = dec.by_ref().take(2).collect::<Result<Vec<_>, _>>().unwrap();
868        assert_eq!(first_two.len(), 2);
869        // Decoder should still have remaining data
870        let next = dec.next();
871        assert!(next.is_some());
872    }
873
874    #[test]
875    fn events_iterator_skips_schema() {
876        let mut enc = Encoder::new();
877        let schema = enc
878            .register_schema(
879                "Ev",
880                vec![FieldDef {
881                    name: "v".into(),
882                    field_type: FieldType::Varint,
883                }],
884            )
885            .unwrap();
886        enc.write_event(
887            &schema,
888            &[FieldValue::Varint(1_000), FieldValue::Varint(42)],
889        )
890        .unwrap();
891        enc.write_event(
892            &schema,
893            &[FieldValue::Varint(2_000), FieldValue::Varint(99)],
894        )
895        .unwrap();
896        let data = enc.finish();
897
898        let mut dec = Decoder::new(&data).unwrap();
899        let events: Vec<_> = dec.events().collect::<Result<Vec<_>, _>>().unwrap();
900        // Only events, no schema frame
901        assert_eq!(events.len(), 2);
902        for ev in &events {
903            assert!(matches!(ev, DecodedFrameRef::Event { .. }));
904        }
905    }
906
907    #[test]
908    fn events_iterator_first_event_only() {
909        let mut enc = Encoder::new();
910        let schema = enc
911            .register_schema(
912                "Ev",
913                vec![FieldDef {
914                    name: "v".into(),
915                    field_type: FieldType::Varint,
916                }],
917            )
918            .unwrap();
919        for i in 0..5u64 {
920            enc.write_event(
921                &schema,
922                &[FieldValue::Varint(i * 1000), FieldValue::Varint(i)],
923            )
924            .unwrap();
925        }
926        let data = enc.finish();
927
928        let mut dec = Decoder::new(&data).unwrap();
929        // Get just the first event — schema is consumed internally
930        let first = dec.events().next().unwrap().unwrap();
931        assert!(matches!(first, DecodedFrameRef::Event { .. }));
932    }
933}