Skip to main content

dial9_trace_format/
encoder.rs

1//! High-level encoder for writing trace files.
2//!
3//! [`Encoder`] writes the file header, registers schemas, interns strings, and
4//! encodes events with delta-compressed timestamps. It is the primary entry
5//! point for producing trace data.
6
7use crate::TraceEvent;
8use crate::codec::{self, PoolEntry, StackPoolEntry, WireTypeId};
9use crate::schema::{SchemaEntry, SchemaRegistry};
10use crate::types::{
11    CountingWriter, EncodeState, EventEncoder, InternedStackFrames, InternedString,
12};
13use std::any::TypeId;
14use std::collections::HashMap;
15use std::hash::{BuildHasherDefault, Hasher};
16use std::io::{self, Write};
17use std::sync::Arc;
18
19/// A fast, non-cryptographic hasher using FxHash's multiply-shift strategy.
20///
21/// For HashMap keys that are already well-distributed (TypeId, Arc<str>), this
22/// avoids hash collisions.
23#[doc(hidden)]
24#[derive(Default)]
25pub struct FxHasher(u64);
26
27impl FxHasher {
28    #[inline]
29    fn hash_word(&mut self, word: u64) {
30        self.0 = (self.0.rotate_left(5) ^ word).wrapping_mul(0x517cc1b727220a95);
31    }
32}
33
34impl Hasher for FxHasher {
35    #[inline]
36    fn write(&mut self, mut bytes: &[u8]) {
37        while bytes.len() >= 8 {
38            self.hash_word(u64::from_ne_bytes(bytes[..8].try_into().unwrap()));
39            bytes = &bytes[8..];
40        }
41        if bytes.len() >= 4 {
42            self.hash_word(u32::from_ne_bytes(bytes[..4].try_into().unwrap()) as u64);
43            bytes = &bytes[4..];
44        }
45        for &b in bytes {
46            self.hash_word(b as u64);
47        }
48    }
49
50    #[inline]
51    fn write_u8(&mut self, i: u8) {
52        self.hash_word(i as u64);
53    }
54
55    #[inline]
56    fn write_u16(&mut self, i: u16) {
57        self.hash_word(i as u64);
58    }
59
60    #[inline]
61    fn write_u32(&mut self, i: u32) {
62        self.hash_word(i as u64);
63    }
64
65    #[inline]
66    fn write_u64(&mut self, i: u64) {
67        self.hash_word(i);
68    }
69
70    #[inline]
71    fn write_usize(&mut self, i: usize) {
72        self.hash_word(i as u64);
73    }
74
75    #[inline]
76    fn write_u128(&mut self, i: u128) {
77        self.hash_word(i as u64);
78        self.hash_word((i >> 64) as u64);
79    }
80
81    #[inline]
82    fn finish(&self) -> u64 {
83        self.0
84    }
85}
86
87#[doc(hidden)]
88pub type FxBuildHasher = BuildHasherDefault<FxHasher>;
89#[doc(hidden)]
90pub type FxHashMap<K, V> = HashMap<K, V, FxBuildHasher>;
91
92/// A schema handle returned by [`Encoder::register_schema`] or created via
93/// [`Schema::new`].
94///
95/// Carries the full schema definition (name + fields) so it can auto-register
96/// itself with any encoder on first use. This means a `Schema` created on one
97/// encoder can be passed to a different encoder and it will just work.
98///
99/// `Schema` is cheap to clone (internally `Arc`-backed).
100#[derive(Clone, Debug)]
101pub struct Schema {
102    pub(crate) entry: Arc<SchemaEntry>,
103    /// Pre-computed `Arc<str>` of the schema name, used as a cheap HashMap key
104    /// (clone is a pointer bump instead of a String allocation).
105    name_key: Arc<str>,
106}
107
108impl Schema {
109    /// Create a schema handle without an encoder.
110    ///
111    /// The schema will be lazily registered the first time it is passed to
112    /// [`Encoder::write_event`].
113    pub fn new(name: &str, fields: Vec<crate::schema::FieldDef>) -> Self {
114        let name_key: Arc<str> = Arc::from(name);
115        Self {
116            entry: Arc::new(SchemaEntry {
117                name: name.to_string(),
118                has_timestamp: true,
119                fields,
120            }),
121            name_key,
122        }
123    }
124
125    /// Schema name.
126    pub fn name(&self) -> &str {
127        &self.entry.name
128    }
129
130    /// Schema field definitions.
131    pub fn fields(&self) -> &[crate::schema::FieldDef] {
132        &self.entry.fields
133    }
134}
135
136/// Key for schema lookup — either by name (manual registration) or by Rust
137/// `TypeId` (derive macro path).
138#[derive(Clone, PartialEq, Eq, Hash)]
139enum SchemaKey {
140    Name(Arc<str>),
141    RustType(TypeId),
142}
143
144/// Trace file encoder.
145///
146/// Writes the binary file header, registers event schemas, interns strings
147/// into a pool, and encodes events with delta-compressed timestamps.
148///
149/// The default type parameter (`Vec<u8>`) buffers everything in memory;
150/// use [`Encoder::new_to`] to write to an arbitrary [`Write`] sink.
151pub struct Encoder<W: Write = Vec<u8>> {
152    state: EncodeState<W>,
153    registry: SchemaRegistry,
154    string_pool: FxHashMap<String, u32>,
155    next_pool_id: u32,
156    stack_pool: FxHashMap<Box<[u64]>, u32>,
157    next_stack_pool_id: u32,
158    schema_ids: FxHashMap<SchemaKey, WireTypeId>,
159}
160
161impl Default for Encoder<Vec<u8>> {
162    fn default() -> Self {
163        Self::new()
164    }
165}
166
167impl Encoder<Vec<u8>> {
168    pub fn new() -> Self {
169        let mut buf = Vec::new();
170        codec::encode_header(&mut buf).expect("Vec::write_all cannot fail");
171        Self {
172            state: EncodeState::new(buf),
173            registry: SchemaRegistry::new(),
174            string_pool: FxHashMap::default(),
175            next_pool_id: 0,
176            stack_pool: FxHashMap::default(),
177            next_stack_pool_id: 0,
178            schema_ids: FxHashMap::default(),
179        }
180    }
181
182    /// Consume the encoder and return the encoded bytes.
183    pub fn finish(self) -> Vec<u8> {
184        self.state.writer.into_inner()
185    }
186}
187
188impl<W: Write> Encoder<W> {
189    /// Create an encoder that writes to an arbitrary writer.
190    /// Writes the file header immediately.
191    pub fn new_to(mut writer: W) -> io::Result<Self> {
192        codec::encode_header(&mut writer)?;
193        Ok(Self {
194            state: EncodeState::new(writer),
195            registry: SchemaRegistry::new(),
196            string_pool: FxHashMap::default(),
197            next_pool_id: 0,
198            stack_pool: FxHashMap::default(),
199            next_stack_pool_id: 0,
200            schema_ids: FxHashMap::default(),
201        })
202    }
203
204    /// Create an encoder seeded from decoded state. Used by
205    /// [`Decoder::into_encoder`](crate::decoder::Decoder::into_encoder).
206    pub(crate) fn from_decoder(
207        mut registry: SchemaRegistry,
208        string_pool: crate::decoder::StringPool,
209        stack_pool: crate::decoder::StackPool,
210        timestamp_base_ns: u64,
211        writer: W,
212    ) -> Self {
213        let mut pool = FxHashMap::default();
214        let mut next_pool_id: u32 = 0;
215        for (id, value) in string_pool.0.into_iter() {
216            pool.insert(value, id.raw_id());
217            if id.raw_id() >= next_pool_id {
218                next_pool_id = id.raw_id() + 1;
219            }
220        }
221
222        let mut new_stack_pool: FxHashMap<Box<[u64]>, u32> = FxHashMap::default();
223        let mut next_stack_pool_id: u32 = 0;
224        for (id, frames) in stack_pool.0.into_iter() {
225            new_stack_pool.insert(frames.into_boxed_slice(), id.raw_id());
226            if id.raw_id() >= next_stack_pool_id {
227                next_stack_pool_id = id.raw_id() + 1;
228            }
229        }
230
231        let mut schema_ids = FxHashMap::default();
232        for (wire_id, entry) in registry.entries() {
233            schema_ids.insert(SchemaKey::Name(Arc::from(entry.name.as_str())), wire_id);
234        }
235        registry.sync_next_id();
236
237        let mut state = EncodeState::new(writer);
238        state.set_ts_base_unchecked(timestamp_base_ns);
239
240        Self {
241            state,
242            registry,
243            string_pool: pool,
244            next_pool_id,
245            stack_pool: new_stack_pool,
246            next_stack_pool_id,
247            schema_ids,
248        }
249    }
250
251    /// Consume the encoder and return the inner writer.
252    pub fn into_inner(self) -> W {
253        self.state.writer.into_inner()
254    }
255
256    /// Borrow the inner writer.
257    pub fn as_inner(&self) -> &W {
258        self.state.writer.inner()
259    }
260
261    /// Total bytes written through this encoder (including the file header).
262    pub fn bytes_written(&self) -> u64 {
263        self.state.writer.bytes_written()
264    }
265
266    /// Reset the encoder to a new writer, preserving internal allocations.
267    /// Returns the old writer. Writes a file header to the new writer.
268    pub fn reset_to(&mut self, mut new_writer: W) -> io::Result<W> {
269        codec::encode_header(&mut new_writer)?;
270        self.string_pool.clear();
271        self.next_pool_id = 0;
272        self.stack_pool.clear();
273        self.next_stack_pool_id = 0;
274        self.registry.clear();
275        self.schema_ids.clear();
276        // creating a new EncodeState resets the timestamp delta
277        let old_state = std::mem::replace(&mut self.state, EncodeState::new(new_writer));
278        Ok(old_state.writer.into_inner())
279    }
280
281    /// Ensure a schema is registered with this encoder. Returns the wire type
282    /// ID for this encoder's output stream.
283    ///
284    /// Idempotent if the schema matches. Errors if a different schema was
285    /// already registered under the same name.
286    fn ensure_registered(&mut self, schema: &Schema) -> io::Result<WireTypeId> {
287        let key = SchemaKey::Name(Arc::clone(&schema.name_key));
288        if let Some(&wire_id) = self.schema_ids.get(&key) {
289            // TODO: unify registry and schema_ids to avoid this error case
290            let Some(existing) = self.registry.get(wire_id) else {
291                return Err(io::Error::other(format!(
292                    "corrupted internal state. {wire_id:?} in schema_ids but not in registry."
293                )));
294            };
295            if *existing == *schema.entry {
296                return Ok(wire_id);
297            }
298            return Err(io::Error::new(
299                io::ErrorKind::InvalidInput,
300                format!(
301                    "schema already registered with different definition: {}",
302                    schema.name()
303                ),
304            ));
305        }
306        let id = self.registry.next_type_id();
307        codec::encode_schema(id, &schema.entry, &mut self.state.writer)?;
308        self.registry
309            .register(id, (*schema.entry).clone())
310            .expect("schema registration failed");
311        self.schema_ids.insert(key, id);
312        Ok(id)
313    }
314
315    /// Register a schema by name. Returns a [`Schema`] handle that can be
316    /// passed to [`write_event`](Self::write_event) (on this or any other
317    /// encoder).
318    ///
319    /// All schemas have timestamps. When writing events, the first element of
320    /// `values` must be `FieldValue::Varint(timestamp_ns)`. It is extracted and
321    /// encoded in the event header (not as a regular field).
322    ///
323    /// Eagerly writes the schema frame. Idempotent if the definition matches.
324    pub fn register_schema(
325        &mut self,
326        name: &str,
327        fields: Vec<crate::schema::FieldDef>,
328    ) -> io::Result<Schema> {
329        let schema = Schema::new(name, fields);
330        self.ensure_registered(&schema)?;
331        Ok(schema)
332    }
333
334    /// Write an event for a schema.
335    ///
336    /// The first element of `values` must be `FieldValue::Varint(timestamp_ns)`
337    /// — it is extracted and encoded in the event header, not as a regular
338    /// field. The remaining values must match the schema's field count.
339    ///
340    /// If this encoder hasn't seen `schema` before, it is auto-registered
341    /// (the schema frame is written before the event).
342    pub fn write_event(
343        &mut self,
344        schema: &Schema,
345        values: &[crate::types::FieldValue],
346    ) -> io::Result<()> {
347        use crate::types::FieldValue;
348
349        let type_id = self.ensure_registered(schema)?;
350        let expected_fields = schema.entry.fields.len();
351
352        let ts_ns = match values.first() {
353            Some(FieldValue::Varint(ns)) => *ns,
354            _ => {
355                return Err(io::Error::new(
356                    io::ErrorKind::InvalidInput,
357                    "first value must be FieldValue::Varint(timestamp_ns)",
358                ));
359            }
360        };
361        let field_values = &values[1..];
362
363        if field_values.len() != expected_fields {
364            return Err(io::Error::new(
365                io::ErrorKind::InvalidInput,
366                format!(
367                    "value count ({}) does not match schema field count ({}) for schema '{}'",
368                    field_values.len(),
369                    expected_fields,
370                    schema.name(),
371                ),
372            ));
373        }
374
375        let ts_delta = self.state.encode_timestamp_delta(ts_ns)?;
376        self.state.writer.write_all(&[codec::TAG_EVENT])?;
377        self.state.writer.write_all(&type_id.0.to_le_bytes())?;
378        codec::encode_u24_le(ts_delta, &mut self.state.writer)?;
379        let mut enc = EventEncoder::new(&mut self.state);
380        for (i, v) in field_values.iter().enumerate() {
381            enc.write_field_value(v, schema.entry.fields[i].field_type)?;
382        }
383        Ok(())
384    }
385
386    /// Write a derived TraceEvent. Auto-registers the schema on first call for this type.
387    /// Handles timestamp encoding: emits TimestampReset if needed, packs u24 delta in header.
388    pub fn write<T: TraceEvent + 'static>(&mut self, event: &T) -> io::Result<()> {
389        let key = SchemaKey::RustType(TypeId::of::<T>());
390        let tid = if let Some(&cached) = self.schema_ids.get(&key) {
391            cached
392        } else {
393            let entry = T::schema_entry();
394            let schema = Schema::new(&entry.name, entry.fields);
395            let id = self.ensure_registered(&schema)?;
396            self.schema_ids.insert(key, id);
397            id
398        };
399        let ts_ns = event.timestamp();
400        let ts_delta = self.state.encode_timestamp_delta(ts_ns)?;
401        self.state.writer.write_all(&[codec::TAG_EVENT])?;
402        self.state.writer.write_all(&tid.0.to_le_bytes())?;
403        codec::encode_u24_le(ts_delta, &mut self.state.writer)?;
404        let mut enc = EventEncoder::new(&mut self.state);
405        event.encode_fields(&mut enc)
406    }
407
408    /// Intern a string, emitting a pool frame if new. Returns an [`InternedString`] handle.
409    pub fn intern_string(&mut self, s: &str) -> io::Result<InternedString> {
410        if let Some(&id) = self.string_pool.get(s) {
411            return Ok(InternedString(id));
412        }
413        let id = self.next_pool_id;
414        self.next_pool_id += 1;
415        self.string_pool.insert(s.to_string(), id);
416        codec::encode_string_pool(
417            &[PoolEntry {
418                pool_id: id,
419                data: s.as_bytes().to_vec(),
420            }],
421            &mut self.state.writer,
422        )?;
423        Ok(InternedString(id))
424    }
425
426    pub fn write_string_pool(&mut self, entries: &[PoolEntry]) -> io::Result<()> {
427        codec::encode_string_pool(entries, &mut self.state.writer)
428    }
429
430    /// Intern a stack-frame vector, emitting a stack-pool frame if new.
431    /// Returns an [`InternedStackFrames`] handle.
432    pub fn intern_stack_frames(&mut self, frames: &[u64]) -> io::Result<InternedStackFrames> {
433        if let Some(&id) = self.stack_pool.get(frames) {
434            return Ok(InternedStackFrames(id));
435        }
436        let id = self.next_stack_pool_id;
437        self.next_stack_pool_id += 1;
438        self.stack_pool.insert(frames.into(), id);
439        codec::encode_stack_pool(
440            &[StackPoolEntry {
441                pool_id: id,
442                frames: frames.to_vec(),
443            }],
444            &mut self.state.writer,
445        )?;
446        Ok(InternedStackFrames(id))
447    }
448
449    pub fn write_stack_pool(&mut self, entries: &[StackPoolEntry]) -> io::Result<()> {
450        codec::encode_stack_pool(entries, &mut self.state.writer)
451    }
452
453    /// Flush the underlying writer.
454    pub fn flush(&mut self) -> io::Result<()> {
455        self.state.writer.flush()
456    }
457
458    /// Convert this encoder into a [`RawEncoder`] that only supports writing
459    /// pre-encoded bytes. The byte count is preserved so rotation decisions
460    /// remain correct.
461    ///
462    /// Use this after writing any structured data (headers, segment metadata)
463    /// to switch to a raw-only mode for appending pre-encoded batches.
464    pub fn into_raw_encoder(self) -> RawEncoder<W> {
465        RawEncoder {
466            writer: self.state.writer,
467        }
468    }
469}
470
471/// A write-only encoder that accepts pre-encoded bytes.
472///
473/// Created by [`Encoder::into_raw_encoder`] after the file header and any
474/// structured metadata have been written. Carries no schema registry, string
475/// pool, or timestamp state — it simply forwards bytes to the underlying
476/// writer while tracking the total byte count.
477pub struct RawEncoder<W> {
478    writer: CountingWriter<W>,
479}
480
481impl<W: Write> RawEncoder<W> {
482    /// Write pre-encoded bytes to the underlying writer.
483    pub fn write_raw(&mut self, bytes: &[u8]) -> io::Result<()> {
484        self.writer.write_all(bytes)
485    }
486
487    /// Total bytes written (including bytes written by the [`Encoder`] before
488    /// conversion).
489    pub fn bytes_written(&self) -> u64 {
490        self.writer.bytes_written()
491    }
492
493    /// Flush the underlying writer.
494    pub fn flush(&mut self) -> io::Result<()> {
495        self.writer.flush()
496    }
497
498    /// Consume the raw encoder and return the inner writer.
499    pub fn into_inner(self) -> W {
500        self.writer.into_inner()
501    }
502}
503
504impl Encoder<Vec<u8>> {
505    pub fn write_infallible<T: TraceEvent + 'static>(&mut self, event: &T) {
506        self.write(event).expect("writing to Vec<u8> is infallible")
507    }
508
509    pub fn intern_string_infallible(&mut self, s: &str) -> InternedString {
510        self.intern_string(s)
511            .expect("interning into Vec<u8> is infallible")
512    }
513
514    pub fn intern_stack_frames_infallible(&mut self, frames: &[u64]) -> InternedStackFrames {
515        self.intern_stack_frames(frames)
516            .expect("interning into Vec<u8> is infallible")
517    }
518
519    /// Resets the encoder to point to a new backing Vec returning the old one
520    pub fn reset_to_infallible(&mut self, data: Vec<u8>) -> Vec<u8> {
521        self.reset_to(data)
522            .expect("writing to Vec<u8> is infallible")
523    }
524}
525
526#[cfg(test)]
527mod tests {
528    use super::*;
529    use crate::schema::FieldDef;
530    use crate::types::{FieldType, FieldValue};
531
532    #[test]
533    fn encoder_writes_header() {
534        let enc = Encoder::new();
535        let data = enc.finish();
536        assert_eq!(&data[..5], &[0x54, 0x52, 0x43, 0x00, 1]);
537    }
538
539    #[test]
540    fn encoder_register_and_write_event() {
541        let mut enc = Encoder::new();
542        let schema = enc
543            .register_schema(
544                "Ev",
545                vec![FieldDef {
546                    name: "v".into(),
547                    field_type: FieldType::Varint,
548                }],
549            )
550            .unwrap();
551        enc.write_event(
552            &schema,
553            &[FieldValue::Varint(1_000), FieldValue::Varint(42)],
554        )
555        .unwrap();
556        let data = enc.finish();
557        assert!(data.len() > 5);
558    }
559
560    #[test]
561    fn idempotent_re_registration() {
562        let mut enc = Encoder::new();
563        let fields = vec![FieldDef {
564            name: "v".into(),
565            field_type: FieldType::Varint,
566        }];
567        let _s1 = enc.register_schema("Ev", fields.clone()).unwrap();
568        let _s2 = enc.register_schema("Ev", fields).unwrap();
569        // Both succeed — same schema, same name
570    }
571
572    #[test]
573    fn re_registration_different_schema_errors() {
574        let mut enc = Encoder::new();
575        enc.register_schema(
576            "Ev",
577            vec![FieldDef {
578                name: "v".into(),
579                field_type: FieldType::Varint,
580            }],
581        )
582        .unwrap();
583        let result = enc.register_schema(
584            "Ev",
585            vec![FieldDef {
586                name: "different".into(),
587                field_type: FieldType::Bool,
588            }],
589        );
590        assert!(result.is_err());
591    }
592
593    #[test]
594    fn schema_auto_registers_on_write() {
595        use crate::decoder::{DecodedFrame, Decoder};
596
597        // Create a schema without an encoder
598        let schema = Schema::new(
599            "Lazy",
600            vec![FieldDef {
601                name: "v".into(),
602                field_type: FieldType::Varint,
603            }],
604        );
605
606        // Write to an encoder that hasn't seen this schema — auto-registers
607        let mut enc = Encoder::new();
608        enc.write_event(
609            &schema,
610            &[FieldValue::Varint(1_000), FieldValue::Varint(42)],
611        )
612        .unwrap();
613
614        let bytes = enc.finish();
615        let mut dec = Decoder::new(&bytes).unwrap();
616        let frames = dec.decode_all();
617        assert!(matches!(&frames[0], DecodedFrame::Schema(s) if s.name == "Lazy"));
618        if let DecodedFrame::Event { values, .. } = &frames[1] {
619            assert_eq!(*values, vec![FieldValue::Varint(42)]);
620        } else {
621            panic!("expected event");
622        }
623    }
624
625    #[test]
626    fn schema_portable_across_encoders() {
627        use crate::decoder::{DecodedFrame, Decoder};
628
629        let mut enc1 = Encoder::new();
630        let schema = enc1
631            .register_schema(
632                "Shared",
633                vec![FieldDef {
634                    name: "v".into(),
635                    field_type: FieldType::Varint,
636                }],
637            )
638            .unwrap();
639        enc1.write_event(&schema, &[FieldValue::Varint(1_000), FieldValue::Varint(1)])
640            .unwrap();
641
642        // Pass the same Schema to a different encoder
643        let mut enc2 = Encoder::new();
644        enc2.write_event(&schema, &[FieldValue::Varint(2_000), FieldValue::Varint(2)])
645            .unwrap();
646
647        // Both encoders produce valid output
648        for (enc, expected_val) in [(enc1, 1u64), (enc2, 2u64)] {
649            let bytes = enc.finish();
650            let mut dec = Decoder::new(&bytes).unwrap();
651            let frames = dec.decode_all();
652            let event = frames
653                .iter()
654                .find(|f| matches!(f, DecodedFrame::Event { .. }))
655                .unwrap();
656            if let DecodedFrame::Event { values, .. } = event {
657                assert_eq!(values[0], FieldValue::Varint(expected_val));
658            }
659        }
660    }
661
662    #[test]
663    fn encoder_intern_string_deduplicates() {
664        let mut enc = Encoder::new();
665        let id1 = enc.intern_string("hello").unwrap();
666        let id2 = enc.intern_string("hello").unwrap();
667        let id3 = enc.intern_string("world").unwrap();
668        assert_eq!(id1, id2);
669        assert_ne!(id1, id3);
670    }
671
672    #[test]
673    fn encoder_intern_stack_frames_deduplicates() {
674        let mut enc = Encoder::new();
675        let stack_a: &[u64] = &[0x1000, 0x2000, 0x3000];
676        let stack_b: &[u64] = &[0x4000, 0x5000];
677        let id1 = enc.intern_stack_frames(stack_a).unwrap();
678        let id2 = enc.intern_stack_frames(stack_a).unwrap();
679        let id3 = enc.intern_stack_frames(stack_b).unwrap();
680        assert_eq!(id1, id2);
681        assert_ne!(id1, id3);
682    }
683
684    #[test]
685    fn stack_pool_round_trip_via_decoder() {
686        use crate::decoder::Decoder;
687        use crate::types::InternedStackFrames;
688
689        let mut enc = Encoder::new();
690        let stack_a: &[u64] = &[0xdead, 0xbeef, 0xcafe];
691        let stack_b: &[u64] = &[0x1, 0x2];
692        let id_a = enc.intern_stack_frames(stack_a).unwrap();
693        let id_b = enc.intern_stack_frames(stack_b).unwrap();
694        let bytes = enc.finish();
695
696        let mut dec = Decoder::new(&bytes).unwrap();
697        let _ = dec.decode_all();
698        assert_eq!(
699            dec.stack_pool().get(InternedStackFrames(id_a.raw_id())),
700            Some(stack_a)
701        );
702        assert_eq!(
703            dec.stack_pool().get(InternedStackFrames(id_b.raw_id())),
704            Some(stack_b)
705        );
706    }
707
708    #[test]
709    fn for_each_event_populates_stack_pool() {
710        use crate::decoder::Decoder;
711        use crate::schema::FieldDef;
712        use crate::types::{FieldType, FieldValue, InternedStackFrames};
713
714        let mut enc = Encoder::new();
715        let schema = enc
716            .register_schema(
717                "CpuSampleEvent",
718                vec![FieldDef {
719                    name: "callchain".into(),
720                    field_type: FieldType::PooledStackFrames,
721                }],
722            )
723            .unwrap();
724        let stack: &[u64] = &[0x1234, 0x5678, 0x9abc];
725        let id = enc.intern_stack_frames(stack).unwrap();
726        enc.write_event(
727            &schema,
728            &[
729                FieldValue::Varint(1_000_000),
730                FieldValue::PooledStackFrames(id),
731            ],
732        )
733        .unwrap();
734        let bytes = enc.finish();
735
736        let mut dec = Decoder::new(&bytes).unwrap();
737        let mut event_count = 0;
738        dec.for_each_event(|_ev| {
739            event_count += 1;
740        })
741        .unwrap();
742        assert_eq!(event_count, 1);
743        assert_eq!(
744            dec.stack_pool().get(InternedStackFrames(id.raw_id())),
745            Some(stack),
746        );
747    }
748
749    #[test]
750    fn encoder_intern_empty_stack_frames() {
751        use crate::decoder::Decoder;
752        use crate::types::InternedStackFrames;
753
754        let mut enc = Encoder::new();
755        let id1 = enc.intern_stack_frames(&[]).unwrap();
756        let id2 = enc.intern_stack_frames(&[]).unwrap();
757        assert_eq!(id1, id2);
758        let bytes = enc.finish();
759
760        let mut dec = Decoder::new(&bytes).unwrap();
761        let _ = dec.decode_all();
762        assert_eq!(
763            dec.stack_pool().get(InternedStackFrames(id1.raw_id())),
764            Some(&[][..])
765        );
766    }
767
768    #[test]
769    fn write_stack_pool_multi_entry_round_trip() {
770        use crate::decoder::Decoder;
771        use crate::types::InternedStackFrames;
772
773        let mut enc = Encoder::new();
774        let entries = vec![
775            StackPoolEntry {
776                pool_id: 0,
777                frames: vec![0xaaaa, 0xbbbb, 0xcccc],
778            },
779            StackPoolEntry {
780                pool_id: 1,
781                frames: vec![0x1111],
782            },
783            StackPoolEntry {
784                pool_id: 2,
785                frames: vec![],
786            },
787        ];
788        enc.write_stack_pool(&entries).unwrap();
789        let bytes = enc.finish();
790
791        let mut dec = Decoder::new(&bytes).unwrap();
792        let _ = dec.decode_all();
793        assert_eq!(
794            dec.stack_pool().get(InternedStackFrames(0)),
795            Some(&[0xaaaa, 0xbbbb, 0xcccc][..])
796        );
797        assert_eq!(
798            dec.stack_pool().get(InternedStackFrames(1)),
799            Some(&[0x1111][..])
800        );
801        assert_eq!(dec.stack_pool().get(InternedStackFrames(2)), Some(&[][..]));
802    }
803
804    #[test]
805    fn decoder_into_encoder_deduplicates_interned_stack_frames() {
806        use crate::decoder::Decoder;
807
808        let mut enc = Encoder::new();
809        let id1 = enc.intern_stack_frames(&[0x10, 0x20]).unwrap();
810        let base = enc.finish();
811
812        let mut decoder = Decoder::new(&base).unwrap();
813        while decoder.next_frame_ref().ok().flatten().is_some() {}
814        let mut output = Vec::new();
815        let mut ext = decoder.into_encoder(&mut output);
816        let id2 = ext.intern_stack_frames(&[0x10, 0x20]).unwrap();
817        let id3 = ext.intern_stack_frames(&[0x30]).unwrap();
818        assert_eq!(id1.raw_id(), id2.raw_id());
819        assert_ne!(id2.raw_id(), id3.raw_id());
820    }
821
822    #[test]
823    fn timestamp_round_trip() {
824        use crate::decoder::{DecodedFrame, Decoder};
825
826        let mut enc = Encoder::new();
827        let schema = enc
828            .register_schema(
829                "TS",
830                vec![FieldDef {
831                    name: "v".into(),
832                    field_type: FieldType::Varint,
833                }],
834            )
835            .unwrap();
836
837        let ts1 = 100_000u64;
838        let ts2 = 50_000u64;
839        let ts3 = 200_000_000u64;
840        let ts4 = 100_000_000u64;
841        enc.write_event(&schema, &[FieldValue::Varint(ts1), FieldValue::Varint(1)])
842            .unwrap();
843        enc.write_event(&schema, &[FieldValue::Varint(ts2), FieldValue::Varint(2)])
844            .unwrap();
845        enc.write_event(&schema, &[FieldValue::Varint(ts3), FieldValue::Varint(3)])
846            .unwrap();
847        enc.write_event(&schema, &[FieldValue::Varint(ts4), FieldValue::Varint(4)])
848            .unwrap();
849
850        let bytes = enc.finish();
851        let mut dec = Decoder::new(&bytes).unwrap();
852        let events: Vec<_> = dec
853            .decode_all()
854            .into_iter()
855            .filter_map(|f| match f {
856                DecodedFrame::Event {
857                    timestamp_ns,
858                    values,
859                    ..
860                } => Some((timestamp_ns, values)),
861                _ => None,
862            })
863            .collect();
864
865        assert_eq!(events.len(), 4);
866        assert_eq!(events[0].0, Some(ts1));
867        assert_eq!(events[0].1, vec![FieldValue::Varint(1)]);
868        assert_eq!(events[1].0, Some(ts2));
869        assert_eq!(events[1].1, vec![FieldValue::Varint(2)]);
870        assert_eq!(events[2].0, Some(ts3));
871        assert_eq!(events[2].1, vec![FieldValue::Varint(3)]);
872        assert_eq!(events[3].0, Some(ts4));
873        assert_eq!(events[3].1, vec![FieldValue::Varint(4)]);
874    }
875
876    #[test]
877    fn encoder_new_to_writer() {
878        let mut buf = Vec::new();
879        let enc = Encoder::new_to(&mut buf).unwrap();
880        drop(enc);
881        assert!(buf.len() >= 5);
882        assert_eq!(&buf[..5], &[0x54, 0x52, 0x43, 0x00, 1]);
883    }
884
885    #[test]
886    fn decoder_into_encoder_appends_without_header() {
887        use crate::decoder::{DecodedFrame, Decoder};
888
889        // Create a trace with a header, a schema, and an event
890        let mut enc = Encoder::new();
891        let schema = enc
892            .register_schema(
893                "Ev",
894                vec![FieldDef {
895                    name: "v".into(),
896                    field_type: FieldType::Varint,
897                }],
898            )
899            .unwrap();
900        enc.write_event(&schema, &[FieldValue::Varint(1_000), FieldValue::Varint(1)])
901            .unwrap();
902        let base = enc.finish();
903
904        // Decode all frames, then convert into an encoder that appends to output
905        let mut decoder = Decoder::new(&base).unwrap();
906        while decoder.next_frame_ref().ok().flatten().is_some() {}
907        let mut output = Vec::new();
908        let mut ext = decoder.into_encoder(&mut output);
909        // Schema "Ev" is already known — no duplicate schema frame emitted
910        ext.write_event(&schema, &[FieldValue::Varint(2_000), FieldValue::Varint(2)])
911            .unwrap();
912        drop(ext);
913
914        // Concatenate and decode
915        let mut combined = base.clone();
916        combined.extend_from_slice(&output);
917        let mut dec = Decoder::new(&combined).unwrap();
918        let events: Vec<_> = dec
919            .decode_all()
920            .into_iter()
921            .filter_map(|f| match f {
922                DecodedFrame::Event {
923                    timestamp_ns,
924                    values,
925                    ..
926                } => Some((timestamp_ns, values)),
927                _ => None,
928            })
929            .collect();
930        assert_eq!(events.len(), 2);
931        assert_eq!(events[0].0, Some(1_000));
932        assert_eq!(events[1].0, Some(2_000));
933    }
934
935    #[test]
936    fn decoder_into_encoder_deduplicates_interned_strings() {
937        use crate::decoder::{DecodedFrame, Decoder};
938
939        // Create a trace with an interned string
940        let mut enc = Encoder::new();
941        let id1 = enc.intern_string("hello").unwrap();
942        let base = enc.finish();
943
944        // Decode all frames, then convert into an encoder
945        let mut decoder = Decoder::new(&base).unwrap();
946        while decoder.next_frame_ref().ok().flatten().is_some() {}
947        let mut output = Vec::new();
948        let mut ext = decoder.into_encoder(&mut output);
949        // "hello" is already interned, should reuse the same ID
950        let id2 = ext.intern_string("hello").unwrap();
951        let id3 = ext.intern_string("world").unwrap();
952        drop(ext);
953
954        assert_eq!(id1, id2, "existing string should reuse pool ID");
955        assert_ne!(id2, id3);
956
957        // "hello" should not produce a new StringPool frame; "world" should
958        let mut combined = base.clone();
959        combined.extend_from_slice(&output);
960        let mut dec = Decoder::new(&combined).unwrap();
961        let frames = dec.decode_all();
962        let pool_frames: Vec<_> = frames
963            .iter()
964            .filter(|f| matches!(f, DecodedFrame::StringPool(_)))
965            .collect();
966        // One from the base trace ("hello"), one from extend ("world")
967        assert_eq!(pool_frames.len(), 2);
968    }
969
970    #[test]
971    fn register_and_write() {
972        use crate::decoder::{DecodedFrame, Decoder};
973
974        let mut enc = Encoder::new();
975        let schema = enc
976            .register_schema(
977                "MyEvent",
978                vec![
979                    FieldDef {
980                        name: "count".into(),
981                        field_type: FieldType::Varint,
982                    },
983                    FieldDef {
984                        name: "name".into(),
985                        field_type: FieldType::String,
986                    },
987                ],
988            )
989            .unwrap();
990
991        enc.write_event(
992            &schema,
993            &[
994                FieldValue::Varint(1_000_000),
995                FieldValue::Varint(42),
996                FieldValue::String("hello".into()),
997            ],
998        )
999        .unwrap();
1000
1001        let bytes = enc.finish();
1002        let mut dec = Decoder::new(&bytes).unwrap();
1003        let frames = dec.decode_all();
1004        let events: Vec<_> = frames
1005            .into_iter()
1006            .filter_map(|f| match f {
1007                DecodedFrame::Event {
1008                    timestamp_ns,
1009                    values,
1010                    ..
1011                } => Some((timestamp_ns, values)),
1012                _ => None,
1013            })
1014            .collect();
1015        assert_eq!(events.len(), 1);
1016        assert_eq!(events[0].0, Some(1_000_000));
1017        assert_eq!(events[0].1[0], FieldValue::Varint(42));
1018        assert_eq!(events[0].1[1], FieldValue::String("hello".into()));
1019    }
1020
1021    #[test]
1022    fn register_conflict_errors() {
1023        let mut enc = Encoder::new();
1024        enc.register_schema(
1025            "Ev",
1026            vec![FieldDef {
1027                name: "v".into(),
1028                field_type: FieldType::Varint,
1029            }],
1030        )
1031        .unwrap();
1032        let result = enc.register_schema(
1033            "Ev",
1034            vec![FieldDef {
1035                name: "other".into(),
1036                field_type: FieldType::Bool,
1037            }],
1038        );
1039        assert!(result.is_err());
1040    }
1041
1042    #[test]
1043    fn write_wrong_field_count_errors() {
1044        let mut enc = Encoder::new();
1045        let schema = enc
1046            .register_schema(
1047                "Ev",
1048                vec![FieldDef {
1049                    name: "v".into(),
1050                    field_type: FieldType::Varint,
1051                }],
1052            )
1053            .unwrap();
1054        // Pass 3 values (ts + 2 fields) for a 1-field schema
1055        let result = enc.write_event(
1056            &schema,
1057            &[
1058                FieldValue::Varint(0),
1059                FieldValue::Varint(1),
1060                FieldValue::Varint(2),
1061            ],
1062        );
1063        assert!(result.is_err());
1064    }
1065
1066    /// Verify that the encoder advances the timestamp base after each event,
1067    /// producing inter-event deltas rather than base-relative deltas.
1068    #[test]
1069    fn timestamp_base_advances_per_event() {
1070        use crate::decoder::{DecodedFrame, Decoder};
1071
1072        let mut enc = Encoder::new();
1073        let schema = enc
1074            .register_schema(
1075                "Ev",
1076                vec![FieldDef {
1077                    name: "v".into(),
1078                    field_type: FieldType::Varint,
1079                }],
1080            )
1081            .unwrap();
1082
1083        let ts1 = 12_000_000u64;
1084        let ts2 = 24_000_000u64;
1085        enc.write_event(&schema, &[FieldValue::Varint(ts1), FieldValue::Varint(1)])
1086            .unwrap();
1087        enc.write_event(&schema, &[FieldValue::Varint(ts2), FieldValue::Varint(2)])
1088            .unwrap();
1089
1090        let bytes = enc.finish();
1091
1092        let reset_count = bytes.iter().filter(|&&b| b == 0x05).count();
1093        assert_eq!(
1094            reset_count, 0,
1095            "base should advance per event, avoiding unnecessary resets"
1096        );
1097
1098        let mut dec = Decoder::new(&bytes).unwrap();
1099        let events: Vec<_> = dec
1100            .decode_all()
1101            .into_iter()
1102            .filter_map(|f| match f {
1103                DecodedFrame::Event { timestamp_ns, .. } => timestamp_ns,
1104                _ => None,
1105            })
1106            .collect();
1107        assert_eq!(events, vec![ts1, ts2]);
1108    }
1109
1110    #[test]
1111    fn reset_to_preserves_capacity() {
1112        let mut enc = Encoder::new();
1113        for i in 0..100 {
1114            enc.intern_string(&format!("string_{}", i)).unwrap();
1115        }
1116        let cap_before = enc.string_pool.capacity();
1117        let _bytes = enc.reset_to(Vec::new());
1118        let cap_after = enc.string_pool.capacity();
1119        assert_eq!(
1120            cap_before, cap_after,
1121            "string_pool capacity should be preserved after reset_to"
1122        );
1123    }
1124
1125    #[test]
1126    fn reset_to_returns_old_data_and_clears_state() {
1127        use crate::decoder::{DecodedFrame, Decoder};
1128
1129        let mut enc = Encoder::new();
1130        let schema = enc
1131            .register_schema(
1132                "Ev",
1133                vec![FieldDef {
1134                    name: "v".into(),
1135                    field_type: FieldType::Varint,
1136                }],
1137            )
1138            .unwrap();
1139        enc.write_event(
1140            &schema,
1141            &[FieldValue::Varint(1_000), FieldValue::Varint(42)],
1142        )
1143        .unwrap();
1144        let _s = enc.intern_string("hello").unwrap();
1145
1146        let old_bytes_written = enc.bytes_written();
1147        assert!(old_bytes_written > 0);
1148
1149        // --- reset ---
1150        let old = enc.reset_to_infallible(Vec::new());
1151
1152        // Invariant 1: old writer contains the data we wrote (decodable)
1153        let mut dec = Decoder::new(&old).unwrap();
1154        let frames = dec.decode_all();
1155        assert!(frames.iter().any(|f| matches!(f, DecodedFrame::Schema(_))));
1156        assert!(
1157            frames
1158                .iter()
1159                .any(|f| matches!(f, DecodedFrame::Event { .. }))
1160        );
1161        assert!(
1162            frames
1163                .iter()
1164                .any(|f| matches!(f, DecodedFrame::StringPool(_)))
1165        );
1166
1167        // Invariant 2: bytes_written resets to just the header size
1168        assert!(
1169            enc.bytes_written() < old_bytes_written,
1170            "bytes_written should reset (got {} vs old {})",
1171            enc.bytes_written(),
1172            old_bytes_written
1173        );
1174
1175        // Invariant 3: schemas are cleared — same schema must re-register
1176        // (write_event auto-registers, so we verify a new schema frame appears)
1177        enc.write_event(
1178            &schema,
1179            &[FieldValue::Varint(2_000), FieldValue::Varint(99)],
1180        )
1181        .unwrap();
1182
1183        // Invariant 4: string pool is cleared — re-interning emits a new pool frame
1184        let _s2 = enc.intern_string("hello").unwrap();
1185
1186        // Invariant 5: new output is a valid standalone trace
1187        let new_bytes = enc.reset_to_infallible(Vec::new());
1188        let mut dec2 = Decoder::new(&new_bytes).unwrap();
1189        let new_frames = dec2.decode_all();
1190        // Must have its own schema definition (not relying on old encoder state)
1191        assert!(
1192            new_frames
1193                .iter()
1194                .any(|f| matches!(f, DecodedFrame::Schema(s) if s.name == "Ev")),
1195            "new trace must contain schema definition"
1196        );
1197        // Must have its own string pool entry
1198        assert!(
1199            new_frames
1200                .iter()
1201                .any(|f| matches!(f, DecodedFrame::StringPool(_))),
1202            "new trace must contain string pool"
1203        );
1204        // Event must decode with correct timestamp (timestamp_base was reset)
1205        let event = new_frames
1206            .iter()
1207            .find_map(|f| match f {
1208                DecodedFrame::Event {
1209                    timestamp_ns,
1210                    values,
1211                    ..
1212                } => Some((timestamp_ns, values)),
1213                _ => None,
1214            })
1215            .expect("new trace must contain event");
1216        assert_eq!(*event.0, Some(2_000));
1217        assert_eq!(event.1[0], FieldValue::Varint(99));
1218    }
1219
1220    #[test]
1221    fn into_raw_encoder_preserves_byte_count() {
1222        let mut enc = Encoder::new();
1223        let schema = enc
1224            .register_schema(
1225                "Ev",
1226                vec![FieldDef {
1227                    name: "v".into(),
1228                    field_type: FieldType::Varint,
1229                }],
1230            )
1231            .unwrap();
1232        enc.write_event(
1233            &schema,
1234            &[FieldValue::Varint(1_000), FieldValue::Varint(42)],
1235        )
1236        .unwrap();
1237
1238        let bytes_before = enc.bytes_written();
1239        assert!(bytes_before > 0);
1240
1241        let raw = enc.into_raw_encoder();
1242        assert_eq!(
1243            raw.bytes_written(),
1244            bytes_before,
1245            "byte count must be preserved across conversion"
1246        );
1247    }
1248
1249    #[test]
1250    fn raw_encoder_write_raw_and_bytes_written() {
1251        let enc = Encoder::new();
1252        let initial = enc.bytes_written();
1253        let mut raw = enc.into_raw_encoder();
1254
1255        let payload = [0xAA; 100];
1256        raw.write_raw(&payload).unwrap();
1257
1258        assert_eq!(
1259            raw.bytes_written(),
1260            initial + payload.len() as u64,
1261            "bytes_written must include raw payload"
1262        );
1263    }
1264
1265    #[test]
1266    fn raw_encoder_into_inner_returns_all_data() {
1267        use crate::decoder::{DecodedFrame, Decoder};
1268
1269        // Write a structured event via Encoder, then append a raw batch
1270        // via RawEncoder, and verify the combined output decodes correctly.
1271        let mut enc = Encoder::new();
1272        let schema = enc
1273            .register_schema(
1274                "Ev",
1275                vec![FieldDef {
1276                    name: "v".into(),
1277                    field_type: FieldType::Varint,
1278                }],
1279            )
1280            .unwrap();
1281        enc.write_event(&schema, &[FieldValue::Varint(1_000), FieldValue::Varint(1)])
1282            .unwrap();
1283
1284        // Build a raw batch with the same schema
1285        let raw_batch = {
1286            let mut batch_enc = Encoder::new();
1287            batch_enc
1288                .write_event(&schema, &[FieldValue::Varint(2_000), FieldValue::Varint(2)])
1289                .unwrap();
1290            batch_enc.finish()
1291        };
1292
1293        let mut raw = enc.into_raw_encoder();
1294        raw.write_raw(&raw_batch).unwrap();
1295        let combined = raw.into_inner();
1296
1297        let mut dec = Decoder::new(&combined).unwrap();
1298        let events: Vec<_> = dec
1299            .decode_all()
1300            .into_iter()
1301            .filter_map(|f| match f {
1302                DecodedFrame::Event {
1303                    timestamp_ns,
1304                    values,
1305                    ..
1306                } => Some((timestamp_ns, values)),
1307                _ => None,
1308            })
1309            .collect();
1310
1311        assert_eq!(events.len(), 2);
1312        assert_eq!(events[0].0, Some(1_000));
1313        assert_eq!(events[0].1, vec![FieldValue::Varint(1)]);
1314        assert_eq!(events[1].0, Some(2_000));
1315        assert_eq!(events[1].1, vec![FieldValue::Varint(2)]);
1316    }
1317}