Skip to main content

dial9_trace_format/
encoder.rs

1//! High-level encoder for writing trace files.
2//!
3//! [`Encoder`] writes the file header, registers schemas, interns strings, and
4//! encodes events with delta-compressed timestamps. It is the primary entry
5//! point for producing trace data.
6
7use crate::TraceEvent;
8use crate::codec::{self, PoolEntry, StackPoolEntry, WireTypeId};
9use crate::schema::{SchemaEntry, SchemaRegistry};
10use crate::types::{
11    CountingWriter, EncodeState, EventEncoder, InternedStackFrames, InternedString,
12};
13use std::any::TypeId;
14use std::collections::HashMap;
15use std::hash::{BuildHasherDefault, Hasher};
16use std::io::{self, Write};
17use std::sync::Arc;
18
19/// A fast, non-cryptographic hasher using FxHash's multiply-shift strategy.
20///
21/// For HashMap keys that are already well-distributed (TypeId, Arc<str>), this
22/// avoids hash collisions.
23#[doc(hidden)]
24#[derive(Default)]
25pub struct FxHasher(u64);
26
27impl FxHasher {
28    #[inline]
29    fn hash_word(&mut self, word: u64) {
30        self.0 = (self.0.rotate_left(5) ^ word).wrapping_mul(0x517cc1b727220a95);
31    }
32}
33
34impl Hasher for FxHasher {
35    #[inline]
36    fn write(&mut self, mut bytes: &[u8]) {
37        while bytes.len() >= 8 {
38            self.hash_word(u64::from_ne_bytes(bytes[..8].try_into().unwrap()));
39            bytes = &bytes[8..];
40        }
41        if bytes.len() >= 4 {
42            self.hash_word(u32::from_ne_bytes(bytes[..4].try_into().unwrap()) as u64);
43            bytes = &bytes[4..];
44        }
45        for &b in bytes {
46            self.hash_word(b as u64);
47        }
48    }
49
50    #[inline]
51    fn write_u8(&mut self, i: u8) {
52        self.hash_word(i as u64);
53    }
54
55    #[inline]
56    fn write_u16(&mut self, i: u16) {
57        self.hash_word(i as u64);
58    }
59
60    #[inline]
61    fn write_u32(&mut self, i: u32) {
62        self.hash_word(i as u64);
63    }
64
65    #[inline]
66    fn write_u64(&mut self, i: u64) {
67        self.hash_word(i);
68    }
69
70    #[inline]
71    fn write_usize(&mut self, i: usize) {
72        self.hash_word(i as u64);
73    }
74
75    #[inline]
76    fn write_u128(&mut self, i: u128) {
77        self.hash_word(i as u64);
78        self.hash_word((i >> 64) as u64);
79    }
80
81    #[inline]
82    fn finish(&self) -> u64 {
83        self.0
84    }
85}
86
87#[doc(hidden)]
88pub type FxBuildHasher = BuildHasherDefault<FxHasher>;
89#[doc(hidden)]
90pub type FxHashMap<K, V> = HashMap<K, V, FxBuildHasher>;
91
92/// A schema handle returned by [`Encoder::register_schema`] or created via
93/// [`Schema::new`].
94///
95/// Carries the full schema definition (name + fields) so it can auto-register
96/// itself with any encoder on first use. This means a `Schema` created on one
97/// encoder can be passed to a different encoder and it will just work.
98///
99/// `Schema` is cheap to clone (internally `Arc`-backed).
100#[derive(Clone, Debug)]
101pub struct Schema {
102    pub(crate) entry: Arc<SchemaEntry>,
103    /// Pre-computed `Arc<str>` of the schema name, used as a cheap HashMap key
104    /// (clone is a pointer bump instead of a String allocation).
105    name_key: Arc<str>,
106}
107
108impl Schema {
109    /// Create a schema handle without an encoder.
110    ///
111    /// The schema will be lazily registered the first time it is passed to
112    /// [`Encoder::write_event`].
113    pub fn new(name: &str, fields: Vec<crate::schema::FieldDef>) -> Self {
114        let name_key: Arc<str> = Arc::from(name);
115        Self {
116            entry: Arc::new(SchemaEntry {
117                name: name.to_string(),
118                has_timestamp: true,
119                fields,
120                annotations: Vec::new(),
121            }),
122            name_key,
123        }
124    }
125
126    /// Create a schema handle from a complete [`SchemaEntry`].
127    pub fn from_entry(entry: SchemaEntry) -> Self {
128        let name_key: Arc<str> = Arc::from(entry.name.as_str());
129        Self {
130            entry: Arc::new(entry),
131            name_key,
132        }
133    }
134
135    /// Schema name.
136    pub fn name(&self) -> &str {
137        &self.entry.name
138    }
139
140    /// Schema field definitions.
141    pub fn fields(&self) -> &[crate::schema::FieldDef] {
142        &self.entry.fields
143    }
144}
145
146/// Key for schema lookup — either by name (manual registration) or by Rust
147/// `TypeId` (derive macro path).
148#[derive(Clone, PartialEq, Eq, Hash)]
149enum SchemaKey {
150    Name(Arc<str>),
151    RustType(TypeId),
152}
153
154/// Trace file encoder.
155///
156/// Writes the binary file header, registers event schemas, interns strings
157/// into a pool, and encodes events with delta-compressed timestamps.
158///
159/// The default type parameter (`Vec<u8>`) buffers everything in memory;
160/// use [`Encoder::new_to`] to write to an arbitrary [`Write`] sink.
161pub struct Encoder<W: Write = Vec<u8>> {
162    state: EncodeState<W>,
163    registry: SchemaRegistry,
164    string_pool: FxHashMap<String, u32>,
165    next_pool_id: u32,
166    stack_pool: FxHashMap<Box<[u64]>, u32>,
167    next_stack_pool_id: u32,
168    schema_ids: FxHashMap<SchemaKey, WireTypeId>,
169}
170
171impl Default for Encoder<Vec<u8>> {
172    fn default() -> Self {
173        Self::new()
174    }
175}
176
177impl Encoder<Vec<u8>> {
178    pub fn new() -> Self {
179        let mut buf = Vec::new();
180        codec::encode_header(&mut buf).expect("Vec::write_all cannot fail");
181        Self {
182            state: EncodeState::new(buf),
183            registry: SchemaRegistry::new(),
184            string_pool: FxHashMap::default(),
185            next_pool_id: 0,
186            stack_pool: FxHashMap::default(),
187            next_stack_pool_id: 0,
188            schema_ids: FxHashMap::default(),
189        }
190    }
191
192    /// Consume the encoder and return the encoded bytes.
193    pub fn finish(self) -> Vec<u8> {
194        self.state.writer.into_inner()
195    }
196}
197
198impl<W: Write> Encoder<W> {
199    /// Create an encoder that writes to an arbitrary writer.
200    /// Writes the file header immediately.
201    pub fn new_to(mut writer: W) -> io::Result<Self> {
202        codec::encode_header(&mut writer)?;
203        Ok(Self {
204            state: EncodeState::new(writer),
205            registry: SchemaRegistry::new(),
206            string_pool: FxHashMap::default(),
207            next_pool_id: 0,
208            stack_pool: FxHashMap::default(),
209            next_stack_pool_id: 0,
210            schema_ids: FxHashMap::default(),
211        })
212    }
213
214    /// Create an encoder seeded from decoded state. Used by
215    /// [`Decoder::into_encoder`](crate::decoder::Decoder::into_encoder).
216    pub(crate) fn from_decoder(
217        mut registry: SchemaRegistry,
218        string_pool: crate::decoder::StringPool,
219        stack_pool: crate::decoder::StackPool,
220        timestamp_base_ns: u64,
221        writer: W,
222    ) -> Self {
223        let mut pool = FxHashMap::default();
224        let mut next_pool_id: u32 = 0;
225        for (id, value) in string_pool.0.into_iter() {
226            pool.insert(value, id.raw_id());
227            if id.raw_id() >= next_pool_id {
228                next_pool_id = id.raw_id() + 1;
229            }
230        }
231
232        let mut new_stack_pool: FxHashMap<Box<[u64]>, u32> = FxHashMap::default();
233        let mut next_stack_pool_id: u32 = 0;
234        for (id, frames) in stack_pool.0.into_iter() {
235            new_stack_pool.insert(frames.into_boxed_slice(), id.raw_id());
236            if id.raw_id() >= next_stack_pool_id {
237                next_stack_pool_id = id.raw_id() + 1;
238            }
239        }
240
241        let mut schema_ids = FxHashMap::default();
242        for (wire_id, entry) in registry.entries() {
243            schema_ids.insert(SchemaKey::Name(Arc::from(entry.name.as_str())), wire_id);
244        }
245        registry.sync_next_id();
246
247        let mut state = EncodeState::new(writer);
248        state.set_ts_base_unchecked(timestamp_base_ns);
249
250        Self {
251            state,
252            registry,
253            string_pool: pool,
254            next_pool_id,
255            stack_pool: new_stack_pool,
256            next_stack_pool_id,
257            schema_ids,
258        }
259    }
260
261    /// Consume the encoder and return the inner writer.
262    pub fn into_inner(self) -> W {
263        self.state.writer.into_inner()
264    }
265
266    /// Borrow the inner writer.
267    pub fn as_inner(&self) -> &W {
268        self.state.writer.inner()
269    }
270
271    /// Total bytes written through this encoder (including the file header).
272    pub fn bytes_written(&self) -> u64 {
273        self.state.writer.bytes_written()
274    }
275
276    /// Reset the encoder to a new writer, preserving internal allocations.
277    /// Returns the old writer. Writes a file header to the new writer.
278    pub fn reset_to(&mut self, mut new_writer: W) -> io::Result<W> {
279        codec::encode_header(&mut new_writer)?;
280        self.string_pool.clear();
281        self.next_pool_id = 0;
282        self.stack_pool.clear();
283        self.next_stack_pool_id = 0;
284        self.registry.clear();
285        self.schema_ids.clear();
286        // creating a new EncodeState resets the timestamp delta
287        let old_state = std::mem::replace(&mut self.state, EncodeState::new(new_writer));
288        Ok(old_state.writer.into_inner())
289    }
290
291    /// Ensure a schema is registered with this encoder. Returns the wire type
292    /// ID for this encoder's output stream.
293    ///
294    /// Idempotent if the schema matches. Errors if a different schema was
295    /// already registered under the same name.
296    fn ensure_registered(&mut self, schema: &Schema) -> io::Result<WireTypeId> {
297        let key = SchemaKey::Name(Arc::clone(&schema.name_key));
298        if let Some(&wire_id) = self.schema_ids.get(&key) {
299            // TODO: unify registry and schema_ids to avoid this error case
300            let Some(existing) = self.registry.get(wire_id) else {
301                return Err(io::Error::other(format!(
302                    "corrupted internal state. {wire_id:?} in schema_ids but not in registry."
303                )));
304            };
305            if *existing == *schema.entry {
306                return Ok(wire_id);
307            }
308            return Err(io::Error::new(
309                io::ErrorKind::InvalidInput,
310                format!(
311                    "schema already registered with different definition: {}",
312                    schema.name()
313                ),
314            ));
315        }
316        let id = self.registry.next_type_id();
317        codec::encode_schema(id, &schema.entry, &mut self.state.writer)?;
318        if !schema.entry.annotations.is_empty() {
319            codec::encode_schema_annotations(
320                id,
321                &schema.entry.annotations,
322                &mut self.state.writer,
323            )?;
324        }
325        self.registry
326            .register(id, (*schema.entry).clone())
327            .expect("schema registration failed");
328        self.schema_ids.insert(key, id);
329        Ok(id)
330    }
331
332    /// Register a schema by name. Returns a [`Schema`] handle that can be
333    /// passed to [`write_event`](Self::write_event) (on this or any other
334    /// encoder).
335    ///
336    /// All schemas have timestamps. When writing events, the first element of
337    /// `values` must be `FieldValue::Varint(timestamp_ns)`. It is extracted and
338    /// encoded in the event header (not as a regular field).
339    ///
340    /// Eagerly writes the schema frame. Idempotent if the definition matches.
341    pub fn register_schema(
342        &mut self,
343        name: &str,
344        fields: Vec<crate::schema::FieldDef>,
345    ) -> io::Result<Schema> {
346        let schema = Schema::new(name, fields);
347        self.ensure_registered(&schema)?;
348        Ok(schema)
349    }
350
351    /// Register a pre-built [`Schema`] handle with this encoder.
352    ///
353    /// Eagerly writes the schema frame (and annotation frame if annotations
354    /// are present). Idempotent if the definition matches.
355    pub fn register_existing(&mut self, schema: &Schema) -> io::Result<WireTypeId> {
356        self.ensure_registered(schema)
357    }
358
359    /// Write an event for a schema.
360    ///
361    /// The first element of `values` must be `FieldValue::Varint(timestamp_ns)`
362    /// — it is extracted and encoded in the event header, not as a regular
363    /// field. The remaining values must match the schema's field count.
364    ///
365    /// If this encoder hasn't seen `schema` before, it is auto-registered
366    /// (the schema frame is written before the event).
367    pub fn write_event(
368        &mut self,
369        schema: &Schema,
370        values: &[crate::types::FieldValue],
371    ) -> io::Result<()> {
372        use crate::types::FieldValue;
373
374        let type_id = self.ensure_registered(schema)?;
375        let expected_fields = schema.entry.fields.len();
376
377        let ts_ns = match values.first() {
378            Some(FieldValue::Varint(ns)) => *ns,
379            _ => {
380                return Err(io::Error::new(
381                    io::ErrorKind::InvalidInput,
382                    "first value must be FieldValue::Varint(timestamp_ns)",
383                ));
384            }
385        };
386        let field_values = &values[1..];
387
388        if field_values.len() != expected_fields {
389            return Err(io::Error::new(
390                io::ErrorKind::InvalidInput,
391                format!(
392                    "value count ({}) does not match schema field count ({}) for schema '{}'",
393                    field_values.len(),
394                    expected_fields,
395                    schema.name(),
396                ),
397            ));
398        }
399
400        let ts_delta = self.state.encode_timestamp_delta(ts_ns)?;
401        self.state.writer.write_all(&[codec::TAG_EVENT])?;
402        self.state.writer.write_all(&type_id.0.to_le_bytes())?;
403        codec::encode_u24_le(ts_delta, &mut self.state.writer)?;
404        let mut enc = EventEncoder::new(&mut self.state);
405        for (i, v) in field_values.iter().enumerate() {
406            enc.write_field_value(v, schema.entry.fields[i].field_type)?;
407        }
408        Ok(())
409    }
410
411    /// Write a derived TraceEvent. Auto-registers the schema on first call for this type.
412    /// Handles timestamp encoding: emits TimestampReset if needed, packs u24 delta in header.
413    pub fn write<T: TraceEvent + 'static>(&mut self, event: &T) -> io::Result<()> {
414        let key = SchemaKey::RustType(TypeId::of::<T>());
415        let tid = if let Some(&cached) = self.schema_ids.get(&key) {
416            cached
417        } else {
418            let entry = T::schema_entry();
419            let schema = Schema::new(&entry.name, entry.fields);
420            let id = self.ensure_registered(&schema)?;
421            self.schema_ids.insert(key, id);
422            id
423        };
424        let ts_ns = event.timestamp();
425        let ts_delta = self.state.encode_timestamp_delta(ts_ns)?;
426        self.state.writer.write_all(&[codec::TAG_EVENT])?;
427        self.state.writer.write_all(&tid.0.to_le_bytes())?;
428        codec::encode_u24_le(ts_delta, &mut self.state.writer)?;
429        let mut enc = EventEncoder::new(&mut self.state);
430        event.encode_fields(&mut enc)
431    }
432
433    /// Intern a string, emitting a pool frame if new. Returns an [`InternedString`] handle.
434    pub fn intern_string(&mut self, s: &str) -> io::Result<InternedString> {
435        if let Some(&id) = self.string_pool.get(s) {
436            return Ok(InternedString(id));
437        }
438        let id = self.next_pool_id;
439        self.next_pool_id += 1;
440        self.string_pool.insert(s.to_string(), id);
441        codec::encode_string_pool(
442            &[PoolEntry {
443                pool_id: id,
444                data: s.as_bytes().to_vec(),
445            }],
446            &mut self.state.writer,
447        )?;
448        Ok(InternedString(id))
449    }
450
451    pub fn write_string_pool(&mut self, entries: &[PoolEntry]) -> io::Result<()> {
452        codec::encode_string_pool(entries, &mut self.state.writer)
453    }
454
455    /// Intern a stack-frame vector, emitting a stack-pool frame if new.
456    /// Returns an [`InternedStackFrames`] handle.
457    pub fn intern_stack_frames(&mut self, frames: &[u64]) -> io::Result<InternedStackFrames> {
458        if let Some(&id) = self.stack_pool.get(frames) {
459            return Ok(InternedStackFrames(id));
460        }
461        let id = self.next_stack_pool_id;
462        self.next_stack_pool_id += 1;
463        self.stack_pool.insert(frames.into(), id);
464        codec::encode_stack_pool(
465            &[StackPoolEntry {
466                pool_id: id,
467                // TODO: allow `StackPoolEntry` to have borrowed frames avoiding the unecessary clone here
468                // https://github.com/dial9-rs/dial9-tokio-telemetry/issues/358
469                frames: frames.to_vec(),
470            }],
471            &mut self.state.writer,
472        )?;
473        Ok(InternedStackFrames(id))
474    }
475
476    pub fn write_stack_pool(&mut self, entries: &[StackPoolEntry]) -> io::Result<()> {
477        codec::encode_stack_pool(entries, &mut self.state.writer)
478    }
479
480    /// Flush the underlying writer.
481    pub fn flush(&mut self) -> io::Result<()> {
482        self.state.writer.flush()
483    }
484
485    /// Convert this encoder into a [`RawEncoder`] that only supports writing
486    /// pre-encoded bytes. The byte count is preserved so rotation decisions
487    /// remain correct.
488    ///
489    /// Use this after writing any structured data (headers, segment metadata)
490    /// to switch to a raw-only mode for appending pre-encoded batches.
491    pub fn into_raw_encoder(self) -> RawEncoder<W> {
492        RawEncoder {
493            writer: self.state.writer,
494        }
495    }
496}
497
498/// A write-only encoder that accepts pre-encoded bytes.
499///
500/// Created by [`Encoder::into_raw_encoder`] after the file header and any
501/// structured metadata have been written. Carries no schema registry, string
502/// pool, or timestamp state — it simply forwards bytes to the underlying
503/// writer while tracking the total byte count.
504pub struct RawEncoder<W> {
505    writer: CountingWriter<W>,
506}
507
508impl<W: Write> RawEncoder<W> {
509    /// Write pre-encoded bytes to the underlying writer.
510    pub fn write_raw(&mut self, bytes: &[u8]) -> io::Result<()> {
511        self.writer.write_all(bytes)
512    }
513
514    /// Total bytes written (including bytes written by the [`Encoder`] before
515    /// conversion).
516    pub fn bytes_written(&self) -> u64 {
517        self.writer.bytes_written()
518    }
519
520    /// Flush the underlying writer.
521    pub fn flush(&mut self) -> io::Result<()> {
522        self.writer.flush()
523    }
524
525    /// Consume the raw encoder and return the inner writer.
526    pub fn into_inner(self) -> W {
527        self.writer.into_inner()
528    }
529}
530
531impl Encoder<Vec<u8>> {
532    pub fn write_infallible<T: TraceEvent + 'static>(&mut self, event: &T) {
533        self.write(event).expect("writing to Vec<u8> is infallible")
534    }
535
536    pub fn intern_string_infallible(&mut self, s: &str) -> InternedString {
537        self.intern_string(s)
538            .expect("interning into Vec<u8> is infallible")
539    }
540
541    pub fn intern_stack_frames_infallible(&mut self, frames: &[u64]) -> InternedStackFrames {
542        self.intern_stack_frames(frames)
543            .expect("interning into Vec<u8> is infallible")
544    }
545
546    /// Resets the encoder to point to a new backing Vec returning the old one
547    pub fn reset_to_infallible(&mut self, data: Vec<u8>) -> Vec<u8> {
548        self.reset_to(data)
549            .expect("writing to Vec<u8> is infallible")
550    }
551}
552
553#[cfg(test)]
554mod tests {
555    use super::*;
556    use crate::schema::FieldDef;
557    use crate::types::{FieldType, FieldValue};
558
559    #[test]
560    fn encoder_writes_header() {
561        let enc = Encoder::new();
562        let data = enc.finish();
563        assert_eq!(&data[..5], &[0x54, 0x52, 0x43, 0x00, 1]);
564    }
565
566    #[test]
567    fn encoder_register_and_write_event() {
568        let mut enc = Encoder::new();
569        let schema = enc
570            .register_schema(
571                "Ev",
572                vec![FieldDef {
573                    name: "v".into(),
574                    field_type: FieldType::Varint,
575                }],
576            )
577            .unwrap();
578        enc.write_event(
579            &schema,
580            &[FieldValue::Varint(1_000), FieldValue::Varint(42)],
581        )
582        .unwrap();
583        let data = enc.finish();
584        assert!(data.len() > 5);
585    }
586
587    #[test]
588    fn idempotent_re_registration() {
589        let mut enc = Encoder::new();
590        let fields = vec![FieldDef {
591            name: "v".into(),
592            field_type: FieldType::Varint,
593        }];
594        let _s1 = enc.register_schema("Ev", fields.clone()).unwrap();
595        let _s2 = enc.register_schema("Ev", fields).unwrap();
596        // Both succeed — same schema, same name
597    }
598
599    #[test]
600    fn re_registration_different_schema_errors() {
601        let mut enc = Encoder::new();
602        enc.register_schema(
603            "Ev",
604            vec![FieldDef {
605                name: "v".into(),
606                field_type: FieldType::Varint,
607            }],
608        )
609        .unwrap();
610        let result = enc.register_schema(
611            "Ev",
612            vec![FieldDef {
613                name: "different".into(),
614                field_type: FieldType::Bool,
615            }],
616        );
617        assert!(result.is_err());
618    }
619
620    #[test]
621    fn schema_auto_registers_on_write() {
622        use crate::decoder::{DecodedFrame, Decoder};
623
624        // Create a schema without an encoder
625        let schema = Schema::new(
626            "Lazy",
627            vec![FieldDef {
628                name: "v".into(),
629                field_type: FieldType::Varint,
630            }],
631        );
632
633        // Write to an encoder that hasn't seen this schema — auto-registers
634        let mut enc = Encoder::new();
635        enc.write_event(
636            &schema,
637            &[FieldValue::Varint(1_000), FieldValue::Varint(42)],
638        )
639        .unwrap();
640
641        let bytes = enc.finish();
642        let mut dec = Decoder::new(&bytes).unwrap();
643        let frames = dec.decode_all();
644        assert!(matches!(&frames[0], DecodedFrame::Schema(s) if s.name == "Lazy"));
645        if let DecodedFrame::Event { values, .. } = &frames[1] {
646            assert_eq!(*values, vec![FieldValue::Varint(42)]);
647        } else {
648            panic!("expected event");
649        }
650    }
651
652    #[test]
653    fn schema_portable_across_encoders() {
654        use crate::decoder::{DecodedFrame, Decoder};
655
656        let mut enc1 = Encoder::new();
657        let schema = enc1
658            .register_schema(
659                "Shared",
660                vec![FieldDef {
661                    name: "v".into(),
662                    field_type: FieldType::Varint,
663                }],
664            )
665            .unwrap();
666        enc1.write_event(&schema, &[FieldValue::Varint(1_000), FieldValue::Varint(1)])
667            .unwrap();
668
669        // Pass the same Schema to a different encoder
670        let mut enc2 = Encoder::new();
671        enc2.write_event(&schema, &[FieldValue::Varint(2_000), FieldValue::Varint(2)])
672            .unwrap();
673
674        // Both encoders produce valid output
675        for (enc, expected_val) in [(enc1, 1u64), (enc2, 2u64)] {
676            let bytes = enc.finish();
677            let mut dec = Decoder::new(&bytes).unwrap();
678            let frames = dec.decode_all();
679            let event = frames
680                .iter()
681                .find(|f| matches!(f, DecodedFrame::Event { .. }))
682                .unwrap();
683            if let DecodedFrame::Event { values, .. } = event {
684                assert_eq!(values[0], FieldValue::Varint(expected_val));
685            }
686        }
687    }
688
689    #[test]
690    fn encoder_intern_string_deduplicates() {
691        let mut enc = Encoder::new();
692        let id1 = enc.intern_string("hello").unwrap();
693        let id2 = enc.intern_string("hello").unwrap();
694        let id3 = enc.intern_string("world").unwrap();
695        assert_eq!(id1, id2);
696        assert_ne!(id1, id3);
697    }
698
699    #[test]
700    fn encoder_intern_stack_frames_deduplicates() {
701        let mut enc = Encoder::new();
702        let stack_a: &[u64] = &[0x1000, 0x2000, 0x3000];
703        let stack_b: &[u64] = &[0x4000, 0x5000];
704        let id1 = enc.intern_stack_frames(stack_a).unwrap();
705        let id2 = enc.intern_stack_frames(stack_a).unwrap();
706        let id3 = enc.intern_stack_frames(stack_b).unwrap();
707        assert_eq!(id1, id2);
708        assert_ne!(id1, id3);
709    }
710
711    #[test]
712    fn stack_pool_round_trip_via_decoder() {
713        use crate::decoder::Decoder;
714        use crate::types::InternedStackFrames;
715
716        let mut enc = Encoder::new();
717        let stack_a: &[u64] = &[0xdead, 0xbeef, 0xcafe];
718        let stack_b: &[u64] = &[0x1, 0x2];
719        let id_a = enc.intern_stack_frames(stack_a).unwrap();
720        let id_b = enc.intern_stack_frames(stack_b).unwrap();
721        let bytes = enc.finish();
722
723        let mut dec = Decoder::new(&bytes).unwrap();
724        let _ = dec.decode_all();
725        assert_eq!(
726            dec.stack_pool().get(InternedStackFrames(id_a.raw_id())),
727            Some(stack_a)
728        );
729        assert_eq!(
730            dec.stack_pool().get(InternedStackFrames(id_b.raw_id())),
731            Some(stack_b)
732        );
733    }
734
735    #[test]
736    fn for_each_event_populates_stack_pool() {
737        use crate::decoder::Decoder;
738        use crate::schema::FieldDef;
739        use crate::types::{FieldType, FieldValue, InternedStackFrames};
740
741        let mut enc = Encoder::new();
742        let schema = enc
743            .register_schema(
744                "CpuSampleEvent",
745                vec![FieldDef {
746                    name: "callchain".into(),
747                    field_type: FieldType::PooledStackFrames,
748                }],
749            )
750            .unwrap();
751        let stack: &[u64] = &[0x1234, 0x5678, 0x9abc];
752        let id = enc.intern_stack_frames(stack).unwrap();
753        enc.write_event(
754            &schema,
755            &[
756                FieldValue::Varint(1_000_000),
757                FieldValue::PooledStackFrames(id),
758            ],
759        )
760        .unwrap();
761        let bytes = enc.finish();
762
763        let mut dec = Decoder::new(&bytes).unwrap();
764        let mut event_count = 0;
765        dec.for_each_event(|_ev| {
766            event_count += 1;
767        })
768        .unwrap();
769        assert_eq!(event_count, 1);
770        assert_eq!(
771            dec.stack_pool().get(InternedStackFrames(id.raw_id())),
772            Some(stack),
773        );
774    }
775
776    #[test]
777    fn encoder_intern_empty_stack_frames() {
778        use crate::decoder::Decoder;
779        use crate::types::InternedStackFrames;
780
781        let mut enc = Encoder::new();
782        let id1 = enc.intern_stack_frames(&[]).unwrap();
783        let id2 = enc.intern_stack_frames(&[]).unwrap();
784        assert_eq!(id1, id2);
785        let bytes = enc.finish();
786
787        let mut dec = Decoder::new(&bytes).unwrap();
788        let _ = dec.decode_all();
789        assert_eq!(
790            dec.stack_pool().get(InternedStackFrames(id1.raw_id())),
791            Some(&[][..])
792        );
793    }
794
795    #[test]
796    fn write_stack_pool_multi_entry_round_trip() {
797        use crate::decoder::Decoder;
798        use crate::types::InternedStackFrames;
799
800        let mut enc = Encoder::new();
801        let entries = vec![
802            StackPoolEntry {
803                pool_id: 0,
804                frames: vec![0xaaaa, 0xbbbb, 0xcccc],
805            },
806            StackPoolEntry {
807                pool_id: 1,
808                frames: vec![0x1111],
809            },
810            StackPoolEntry {
811                pool_id: 2,
812                frames: vec![],
813            },
814        ];
815        enc.write_stack_pool(&entries).unwrap();
816        let bytes = enc.finish();
817
818        let mut dec = Decoder::new(&bytes).unwrap();
819        let _ = dec.decode_all();
820        assert_eq!(
821            dec.stack_pool().get(InternedStackFrames(0)),
822            Some(&[0xaaaa, 0xbbbb, 0xcccc][..])
823        );
824        assert_eq!(
825            dec.stack_pool().get(InternedStackFrames(1)),
826            Some(&[0x1111][..])
827        );
828        assert_eq!(dec.stack_pool().get(InternedStackFrames(2)), Some(&[][..]));
829    }
830
831    #[test]
832    fn decoder_into_encoder_deduplicates_interned_stack_frames() {
833        use crate::decoder::Decoder;
834
835        let mut enc = Encoder::new();
836        let id1 = enc.intern_stack_frames(&[0x10, 0x20]).unwrap();
837        let base = enc.finish();
838
839        let mut decoder = Decoder::new(&base).unwrap();
840        while decoder.next_frame_ref().ok().flatten().is_some() {}
841        let mut output = Vec::new();
842        let mut ext = decoder.into_encoder(&mut output);
843        let id2 = ext.intern_stack_frames(&[0x10, 0x20]).unwrap();
844        let id3 = ext.intern_stack_frames(&[0x30]).unwrap();
845        assert_eq!(id1.raw_id(), id2.raw_id());
846        assert_ne!(id2.raw_id(), id3.raw_id());
847    }
848
849    #[test]
850    fn timestamp_round_trip() {
851        use crate::decoder::{DecodedFrame, Decoder};
852
853        let mut enc = Encoder::new();
854        let schema = enc
855            .register_schema(
856                "TS",
857                vec![FieldDef {
858                    name: "v".into(),
859                    field_type: FieldType::Varint,
860                }],
861            )
862            .unwrap();
863
864        let ts1 = 100_000u64;
865        let ts2 = 50_000u64;
866        let ts3 = 200_000_000u64;
867        let ts4 = 100_000_000u64;
868        enc.write_event(&schema, &[FieldValue::Varint(ts1), FieldValue::Varint(1)])
869            .unwrap();
870        enc.write_event(&schema, &[FieldValue::Varint(ts2), FieldValue::Varint(2)])
871            .unwrap();
872        enc.write_event(&schema, &[FieldValue::Varint(ts3), FieldValue::Varint(3)])
873            .unwrap();
874        enc.write_event(&schema, &[FieldValue::Varint(ts4), FieldValue::Varint(4)])
875            .unwrap();
876
877        let bytes = enc.finish();
878        let mut dec = Decoder::new(&bytes).unwrap();
879        let events: Vec<_> = dec
880            .decode_all()
881            .into_iter()
882            .filter_map(|f| match f {
883                DecodedFrame::Event {
884                    timestamp_ns,
885                    values,
886                    ..
887                } => Some((timestamp_ns, values)),
888                _ => None,
889            })
890            .collect();
891
892        assert_eq!(events.len(), 4);
893        assert_eq!(events[0].0, Some(ts1));
894        assert_eq!(events[0].1, vec![FieldValue::Varint(1)]);
895        assert_eq!(events[1].0, Some(ts2));
896        assert_eq!(events[1].1, vec![FieldValue::Varint(2)]);
897        assert_eq!(events[2].0, Some(ts3));
898        assert_eq!(events[2].1, vec![FieldValue::Varint(3)]);
899        assert_eq!(events[3].0, Some(ts4));
900        assert_eq!(events[3].1, vec![FieldValue::Varint(4)]);
901    }
902
903    #[test]
904    fn encoder_new_to_writer() {
905        let mut buf = Vec::new();
906        let enc = Encoder::new_to(&mut buf).unwrap();
907        drop(enc);
908        assert!(buf.len() >= 5);
909        assert_eq!(&buf[..5], &[0x54, 0x52, 0x43, 0x00, 1]);
910    }
911
912    #[test]
913    fn decoder_into_encoder_appends_without_header() {
914        use crate::decoder::{DecodedFrame, Decoder};
915
916        // Create a trace with a header, a schema, and an event
917        let mut enc = Encoder::new();
918        let schema = enc
919            .register_schema(
920                "Ev",
921                vec![FieldDef {
922                    name: "v".into(),
923                    field_type: FieldType::Varint,
924                }],
925            )
926            .unwrap();
927        enc.write_event(&schema, &[FieldValue::Varint(1_000), FieldValue::Varint(1)])
928            .unwrap();
929        let base = enc.finish();
930
931        // Decode all frames, then convert into an encoder that appends to output
932        let mut decoder = Decoder::new(&base).unwrap();
933        while decoder.next_frame_ref().ok().flatten().is_some() {}
934        let mut output = Vec::new();
935        let mut ext = decoder.into_encoder(&mut output);
936        // Schema "Ev" is already known — no duplicate schema frame emitted
937        ext.write_event(&schema, &[FieldValue::Varint(2_000), FieldValue::Varint(2)])
938            .unwrap();
939        drop(ext);
940
941        // Concatenate and decode
942        let mut combined = base.clone();
943        combined.extend_from_slice(&output);
944        let mut dec = Decoder::new(&combined).unwrap();
945        let events: Vec<_> = dec
946            .decode_all()
947            .into_iter()
948            .filter_map(|f| match f {
949                DecodedFrame::Event {
950                    timestamp_ns,
951                    values,
952                    ..
953                } => Some((timestamp_ns, values)),
954                _ => None,
955            })
956            .collect();
957        assert_eq!(events.len(), 2);
958        assert_eq!(events[0].0, Some(1_000));
959        assert_eq!(events[1].0, Some(2_000));
960    }
961
962    #[test]
963    fn decoder_into_encoder_deduplicates_interned_strings() {
964        use crate::decoder::{DecodedFrame, Decoder};
965
966        // Create a trace with an interned string
967        let mut enc = Encoder::new();
968        let id1 = enc.intern_string("hello").unwrap();
969        let base = enc.finish();
970
971        // Decode all frames, then convert into an encoder
972        let mut decoder = Decoder::new(&base).unwrap();
973        while decoder.next_frame_ref().ok().flatten().is_some() {}
974        let mut output = Vec::new();
975        let mut ext = decoder.into_encoder(&mut output);
976        // "hello" is already interned, should reuse the same ID
977        let id2 = ext.intern_string("hello").unwrap();
978        let id3 = ext.intern_string("world").unwrap();
979        drop(ext);
980
981        assert_eq!(id1, id2, "existing string should reuse pool ID");
982        assert_ne!(id2, id3);
983
984        // "hello" should not produce a new StringPool frame; "world" should
985        let mut combined = base.clone();
986        combined.extend_from_slice(&output);
987        let mut dec = Decoder::new(&combined).unwrap();
988        let frames = dec.decode_all();
989        let pool_frames: Vec<_> = frames
990            .iter()
991            .filter(|f| matches!(f, DecodedFrame::StringPool(_)))
992            .collect();
993        // One from the base trace ("hello"), one from extend ("world")
994        assert_eq!(pool_frames.len(), 2);
995    }
996
997    #[test]
998    fn register_and_write() {
999        use crate::decoder::{DecodedFrame, Decoder};
1000
1001        let mut enc = Encoder::new();
1002        let schema = enc
1003            .register_schema(
1004                "MyEvent",
1005                vec![
1006                    FieldDef {
1007                        name: "count".into(),
1008                        field_type: FieldType::Varint,
1009                    },
1010                    FieldDef {
1011                        name: "name".into(),
1012                        field_type: FieldType::String,
1013                    },
1014                ],
1015            )
1016            .unwrap();
1017
1018        enc.write_event(
1019            &schema,
1020            &[
1021                FieldValue::Varint(1_000_000),
1022                FieldValue::Varint(42),
1023                FieldValue::String("hello".into()),
1024            ],
1025        )
1026        .unwrap();
1027
1028        let bytes = enc.finish();
1029        let mut dec = Decoder::new(&bytes).unwrap();
1030        let frames = dec.decode_all();
1031        let events: Vec<_> = frames
1032            .into_iter()
1033            .filter_map(|f| match f {
1034                DecodedFrame::Event {
1035                    timestamp_ns,
1036                    values,
1037                    ..
1038                } => Some((timestamp_ns, values)),
1039                _ => None,
1040            })
1041            .collect();
1042        assert_eq!(events.len(), 1);
1043        assert_eq!(events[0].0, Some(1_000_000));
1044        assert_eq!(events[0].1[0], FieldValue::Varint(42));
1045        assert_eq!(events[0].1[1], FieldValue::String("hello".into()));
1046    }
1047
1048    #[test]
1049    fn register_conflict_errors() {
1050        let mut enc = Encoder::new();
1051        enc.register_schema(
1052            "Ev",
1053            vec![FieldDef {
1054                name: "v".into(),
1055                field_type: FieldType::Varint,
1056            }],
1057        )
1058        .unwrap();
1059        let result = enc.register_schema(
1060            "Ev",
1061            vec![FieldDef {
1062                name: "other".into(),
1063                field_type: FieldType::Bool,
1064            }],
1065        );
1066        assert!(result.is_err());
1067    }
1068
1069    #[test]
1070    fn write_wrong_field_count_errors() {
1071        let mut enc = Encoder::new();
1072        let schema = enc
1073            .register_schema(
1074                "Ev",
1075                vec![FieldDef {
1076                    name: "v".into(),
1077                    field_type: FieldType::Varint,
1078                }],
1079            )
1080            .unwrap();
1081        // Pass 3 values (ts + 2 fields) for a 1-field schema
1082        let result = enc.write_event(
1083            &schema,
1084            &[
1085                FieldValue::Varint(0),
1086                FieldValue::Varint(1),
1087                FieldValue::Varint(2),
1088            ],
1089        );
1090        assert!(result.is_err());
1091    }
1092
1093    /// Verify that the encoder advances the timestamp base after each event,
1094    /// producing inter-event deltas rather than base-relative deltas.
1095    #[test]
1096    fn timestamp_base_advances_per_event() {
1097        use crate::decoder::{DecodedFrame, Decoder};
1098
1099        let mut enc = Encoder::new();
1100        let schema = enc
1101            .register_schema(
1102                "Ev",
1103                vec![FieldDef {
1104                    name: "v".into(),
1105                    field_type: FieldType::Varint,
1106                }],
1107            )
1108            .unwrap();
1109
1110        let ts1 = 12_000_000u64;
1111        let ts2 = 24_000_000u64;
1112        enc.write_event(&schema, &[FieldValue::Varint(ts1), FieldValue::Varint(1)])
1113            .unwrap();
1114        enc.write_event(&schema, &[FieldValue::Varint(ts2), FieldValue::Varint(2)])
1115            .unwrap();
1116
1117        let bytes = enc.finish();
1118
1119        let reset_count = bytes.iter().filter(|&&b| b == 0x05).count();
1120        assert_eq!(
1121            reset_count, 0,
1122            "base should advance per event, avoiding unnecessary resets"
1123        );
1124
1125        let mut dec = Decoder::new(&bytes).unwrap();
1126        let events: Vec<_> = dec
1127            .decode_all()
1128            .into_iter()
1129            .filter_map(|f| match f {
1130                DecodedFrame::Event { timestamp_ns, .. } => timestamp_ns,
1131                _ => None,
1132            })
1133            .collect();
1134        assert_eq!(events, vec![ts1, ts2]);
1135    }
1136
1137    #[test]
1138    fn reset_to_preserves_capacity() {
1139        let mut enc = Encoder::new();
1140        for i in 0..100 {
1141            enc.intern_string(&format!("string_{}", i)).unwrap();
1142        }
1143        let cap_before = enc.string_pool.capacity();
1144        let _bytes = enc.reset_to(Vec::new());
1145        let cap_after = enc.string_pool.capacity();
1146        assert_eq!(
1147            cap_before, cap_after,
1148            "string_pool capacity should be preserved after reset_to"
1149        );
1150    }
1151
1152    #[test]
1153    fn reset_to_returns_old_data_and_clears_state() {
1154        use crate::decoder::{DecodedFrame, Decoder};
1155
1156        let mut enc = Encoder::new();
1157        let schema = enc
1158            .register_schema(
1159                "Ev",
1160                vec![FieldDef {
1161                    name: "v".into(),
1162                    field_type: FieldType::Varint,
1163                }],
1164            )
1165            .unwrap();
1166        enc.write_event(
1167            &schema,
1168            &[FieldValue::Varint(1_000), FieldValue::Varint(42)],
1169        )
1170        .unwrap();
1171        let _s = enc.intern_string("hello").unwrap();
1172
1173        let old_bytes_written = enc.bytes_written();
1174        assert!(old_bytes_written > 0);
1175
1176        // --- reset ---
1177        let old = enc.reset_to_infallible(Vec::new());
1178
1179        // Invariant 1: old writer contains the data we wrote (decodable)
1180        let mut dec = Decoder::new(&old).unwrap();
1181        let frames = dec.decode_all();
1182        assert!(frames.iter().any(|f| matches!(f, DecodedFrame::Schema(_))));
1183        assert!(
1184            frames
1185                .iter()
1186                .any(|f| matches!(f, DecodedFrame::Event { .. }))
1187        );
1188        assert!(
1189            frames
1190                .iter()
1191                .any(|f| matches!(f, DecodedFrame::StringPool(_)))
1192        );
1193
1194        // Invariant 2: bytes_written resets to just the header size
1195        assert!(
1196            enc.bytes_written() < old_bytes_written,
1197            "bytes_written should reset (got {} vs old {})",
1198            enc.bytes_written(),
1199            old_bytes_written
1200        );
1201
1202        // Invariant 3: schemas are cleared — same schema must re-register
1203        // (write_event auto-registers, so we verify a new schema frame appears)
1204        enc.write_event(
1205            &schema,
1206            &[FieldValue::Varint(2_000), FieldValue::Varint(99)],
1207        )
1208        .unwrap();
1209
1210        // Invariant 4: string pool is cleared — re-interning emits a new pool frame
1211        let _s2 = enc.intern_string("hello").unwrap();
1212
1213        // Invariant 5: new output is a valid standalone trace
1214        let new_bytes = enc.reset_to_infallible(Vec::new());
1215        let mut dec2 = Decoder::new(&new_bytes).unwrap();
1216        let new_frames = dec2.decode_all();
1217        // Must have its own schema definition (not relying on old encoder state)
1218        assert!(
1219            new_frames
1220                .iter()
1221                .any(|f| matches!(f, DecodedFrame::Schema(s) if s.name == "Ev")),
1222            "new trace must contain schema definition"
1223        );
1224        // Must have its own string pool entry
1225        assert!(
1226            new_frames
1227                .iter()
1228                .any(|f| matches!(f, DecodedFrame::StringPool(_))),
1229            "new trace must contain string pool"
1230        );
1231        // Event must decode with correct timestamp (timestamp_base was reset)
1232        let event = new_frames
1233            .iter()
1234            .find_map(|f| match f {
1235                DecodedFrame::Event {
1236                    timestamp_ns,
1237                    values,
1238                    ..
1239                } => Some((timestamp_ns, values)),
1240                _ => None,
1241            })
1242            .expect("new trace must contain event");
1243        assert_eq!(*event.0, Some(2_000));
1244        assert_eq!(event.1[0], FieldValue::Varint(99));
1245    }
1246
1247    #[test]
1248    fn into_raw_encoder_preserves_byte_count() {
1249        let mut enc = Encoder::new();
1250        let schema = enc
1251            .register_schema(
1252                "Ev",
1253                vec![FieldDef {
1254                    name: "v".into(),
1255                    field_type: FieldType::Varint,
1256                }],
1257            )
1258            .unwrap();
1259        enc.write_event(
1260            &schema,
1261            &[FieldValue::Varint(1_000), FieldValue::Varint(42)],
1262        )
1263        .unwrap();
1264
1265        let bytes_before = enc.bytes_written();
1266        assert!(bytes_before > 0);
1267
1268        let raw = enc.into_raw_encoder();
1269        assert_eq!(
1270            raw.bytes_written(),
1271            bytes_before,
1272            "byte count must be preserved across conversion"
1273        );
1274    }
1275
1276    #[test]
1277    fn raw_encoder_write_raw_and_bytes_written() {
1278        let enc = Encoder::new();
1279        let initial = enc.bytes_written();
1280        let mut raw = enc.into_raw_encoder();
1281
1282        let payload = [0xAA; 100];
1283        raw.write_raw(&payload).unwrap();
1284
1285        assert_eq!(
1286            raw.bytes_written(),
1287            initial + payload.len() as u64,
1288            "bytes_written must include raw payload"
1289        );
1290    }
1291
1292    #[test]
1293    fn raw_encoder_into_inner_returns_all_data() {
1294        use crate::decoder::{DecodedFrame, Decoder};
1295
1296        // Write a structured event via Encoder, then append a raw batch
1297        // via RawEncoder, and verify the combined output decodes correctly.
1298        let mut enc = Encoder::new();
1299        let schema = enc
1300            .register_schema(
1301                "Ev",
1302                vec![FieldDef {
1303                    name: "v".into(),
1304                    field_type: FieldType::Varint,
1305                }],
1306            )
1307            .unwrap();
1308        enc.write_event(&schema, &[FieldValue::Varint(1_000), FieldValue::Varint(1)])
1309            .unwrap();
1310
1311        // Build a raw batch with the same schema
1312        let raw_batch = {
1313            let mut batch_enc = Encoder::new();
1314            batch_enc
1315                .write_event(&schema, &[FieldValue::Varint(2_000), FieldValue::Varint(2)])
1316                .unwrap();
1317            batch_enc.finish()
1318        };
1319
1320        let mut raw = enc.into_raw_encoder();
1321        raw.write_raw(&raw_batch).unwrap();
1322        let combined = raw.into_inner();
1323
1324        let mut dec = Decoder::new(&combined).unwrap();
1325        let events: Vec<_> = dec
1326            .decode_all()
1327            .into_iter()
1328            .filter_map(|f| match f {
1329                DecodedFrame::Event {
1330                    timestamp_ns,
1331                    values,
1332                    ..
1333                } => Some((timestamp_ns, values)),
1334                _ => None,
1335            })
1336            .collect();
1337
1338        assert_eq!(events.len(), 2);
1339        assert_eq!(events[0].0, Some(1_000));
1340        assert_eq!(events[0].1, vec![FieldValue::Varint(1)]);
1341        assert_eq!(events[1].0, Some(2_000));
1342        assert_eq!(events[1].1, vec![FieldValue::Varint(2)]);
1343    }
1344}