Skip to main content

dial9_trace_format/
encoder.rs

1//! High-level encoder for writing trace files.
2//!
3//! [`Encoder`] writes the file header, registers schemas, interns strings, and
4//! encodes events with delta-compressed timestamps. It is the primary entry
5//! point for producing trace data.
6
7use crate::TraceEvent;
8use crate::codec::{self, PoolEntry, WireTypeId};
9use crate::schema::{SchemaEntry, SchemaRegistry};
10use crate::types::{CountingWriter, EncodeState, EventEncoder, InternedString};
11use std::any::TypeId;
12use std::collections::HashMap;
13use std::hash::{BuildHasherDefault, Hasher};
14use std::io::{self, Write};
15use std::sync::Arc;
16
17/// A fast, non-cryptographic hasher using FxHash's multiply-shift strategy.
18///
19/// For HashMap keys that are already well-distributed (TypeId, Arc<str>), this
20/// avoids hash collisions.
21#[doc(hidden)]
22#[derive(Default)]
23pub struct FxHasher(u64);
24
25impl Hasher for FxHasher {
26    #[inline]
27    fn write(&mut self, bytes: &[u8]) {
28        for &b in bytes {
29            self.0 = (self.0.rotate_left(5) ^ b as u64).wrapping_mul(0x517cc1b727220a95);
30        }
31    }
32
33    #[inline]
34    fn write_u64(&mut self, i: u64) {
35        self.0 = (self.0.rotate_left(5) ^ i).wrapping_mul(0x517cc1b727220a95);
36    }
37
38    #[inline]
39    fn write_u32(&mut self, i: u32) {
40        self.write_u64(i as u64)
41    }
42
43    #[inline]
44    fn write_u16(&mut self, i: u16) {
45        self.write_u64(i as u64)
46    }
47
48    #[inline]
49    fn write_usize(&mut self, i: usize) {
50        self.write_u64(i as u64)
51    }
52
53    #[inline]
54    fn write_u128(&mut self, i: u128) {
55        self.write_u64(i as u64);
56        self.write_u64((i >> 64) as u64);
57    }
58
59    #[inline]
60    fn finish(&self) -> u64 {
61        self.0
62    }
63}
64
65#[doc(hidden)]
66pub type FxBuildHasher = BuildHasherDefault<FxHasher>;
67#[doc(hidden)]
68pub type FxHashMap<K, V> = HashMap<K, V, FxBuildHasher>;
69
70/// A schema handle returned by [`Encoder::register_schema`] or created via
71/// [`Schema::new`].
72///
73/// Carries the full schema definition (name + fields) so it can auto-register
74/// itself with any encoder on first use. This means a `Schema` created on one
75/// encoder can be passed to a different encoder and it will just work.
76///
77/// `Schema` is cheap to clone (internally `Arc`-backed).
78#[derive(Clone, Debug)]
79pub struct Schema {
80    pub(crate) entry: Arc<SchemaEntry>,
81    /// Pre-computed `Arc<str>` of the schema name, used as a cheap HashMap key
82    /// (clone is a pointer bump instead of a String allocation).
83    name_key: Arc<str>,
84}
85
86impl Schema {
87    /// Create a schema handle without an encoder.
88    ///
89    /// The schema will be lazily registered the first time it is passed to
90    /// [`Encoder::write_event`].
91    pub fn new(name: &str, fields: Vec<crate::schema::FieldDef>) -> Self {
92        let name_key: Arc<str> = Arc::from(name);
93        Self {
94            entry: Arc::new(SchemaEntry {
95                name: name.to_string(),
96                has_timestamp: true,
97                fields,
98            }),
99            name_key,
100        }
101    }
102
103    /// Schema name.
104    pub fn name(&self) -> &str {
105        &self.entry.name
106    }
107
108    /// Schema field definitions.
109    pub fn fields(&self) -> &[crate::schema::FieldDef] {
110        &self.entry.fields
111    }
112}
113
114/// Key for schema lookup — either by name (manual registration) or by Rust
115/// `TypeId` (derive macro path).
116#[derive(Clone, PartialEq, Eq, Hash)]
117enum SchemaKey {
118    Name(Arc<str>),
119    RustType(TypeId),
120}
121
122/// Trace file encoder.
123///
124/// Writes the binary file header, registers event schemas, interns strings
125/// into a pool, and encodes events with delta-compressed timestamps.
126///
127/// The default type parameter (`Vec<u8>`) buffers everything in memory;
128/// use [`Encoder::new_to`] to write to an arbitrary [`Write`] sink.
129pub struct Encoder<W: Write = Vec<u8>> {
130    state: EncodeState<W>,
131    registry: SchemaRegistry,
132    string_pool: FxHashMap<String, u32>,
133    next_pool_id: u32,
134    schema_ids: FxHashMap<SchemaKey, WireTypeId>,
135}
136
137impl Default for Encoder<Vec<u8>> {
138    fn default() -> Self {
139        Self::new()
140    }
141}
142
143impl Encoder<Vec<u8>> {
144    pub fn new() -> Self {
145        let mut buf = Vec::new();
146        codec::encode_header(&mut buf).expect("Vec::write_all cannot fail");
147        Self {
148            state: EncodeState::new(buf),
149            registry: SchemaRegistry::new(),
150            string_pool: FxHashMap::default(),
151            next_pool_id: 0,
152            schema_ids: FxHashMap::default(),
153        }
154    }
155
156    /// Consume the encoder and return the encoded bytes.
157    pub fn finish(self) -> Vec<u8> {
158        self.state.writer.into_inner()
159    }
160}
161
162impl<W: Write> Encoder<W> {
163    /// Create an encoder that writes to an arbitrary writer.
164    /// Writes the file header immediately.
165    pub fn new_to(mut writer: W) -> io::Result<Self> {
166        codec::encode_header(&mut writer)?;
167        Ok(Self {
168            state: EncodeState::new(writer),
169            registry: SchemaRegistry::new(),
170            string_pool: FxHashMap::default(),
171            next_pool_id: 0,
172            schema_ids: FxHashMap::default(),
173        })
174    }
175
176    /// Create an encoder seeded from decoded state. Used by
177    /// [`Decoder::into_encoder`](crate::decoder::Decoder::into_encoder).
178    pub(crate) fn from_decoder(
179        mut registry: SchemaRegistry,
180        string_pool: crate::decoder::StringPool,
181        timestamp_base_ns: u64,
182        writer: W,
183    ) -> Self {
184        let mut pool = FxHashMap::default();
185        let mut next_pool_id: u32 = 0;
186        for (id, value) in string_pool.0.into_iter() {
187            pool.insert(value, id.raw_id());
188            if id.raw_id() >= next_pool_id {
189                next_pool_id = id.raw_id() + 1;
190            }
191        }
192
193        let mut schema_ids = FxHashMap::default();
194        for (wire_id, entry) in registry.entries() {
195            schema_ids.insert(SchemaKey::Name(Arc::from(entry.name.as_str())), wire_id);
196        }
197        registry.sync_next_id();
198
199        let mut state = EncodeState::new(writer);
200        state.set_ts_base_unchecked(timestamp_base_ns);
201
202        Self {
203            state,
204            registry,
205            string_pool: pool,
206            next_pool_id,
207            schema_ids,
208        }
209    }
210
211    /// Consume the encoder and return the inner writer.
212    pub fn into_inner(self) -> W {
213        self.state.writer.into_inner()
214    }
215
216    /// Borrow the inner writer.
217    pub fn as_inner(&self) -> &W {
218        self.state.writer.inner()
219    }
220
221    /// Total bytes written through this encoder (including the file header).
222    pub fn bytes_written(&self) -> u64 {
223        self.state.writer.bytes_written()
224    }
225
226    /// Reset the encoder to a new writer, preserving internal allocations.
227    /// Returns the old writer. Writes a file header to the new writer.
228    pub fn reset_to(&mut self, mut new_writer: W) -> io::Result<W> {
229        codec::encode_header(&mut new_writer)?;
230        self.string_pool.clear();
231        self.next_pool_id = 0;
232        self.registry.clear();
233        self.schema_ids.clear();
234        // creating a new EncodeState resets the timestamp delta
235        let old_state = std::mem::replace(&mut self.state, EncodeState::new(new_writer));
236        Ok(old_state.writer.into_inner())
237    }
238
239    /// Ensure a schema is registered with this encoder. Returns the wire type
240    /// ID for this encoder's output stream.
241    ///
242    /// Idempotent if the schema matches. Errors if a different schema was
243    /// already registered under the same name.
244    fn ensure_registered(&mut self, schema: &Schema) -> io::Result<WireTypeId> {
245        let key = SchemaKey::Name(Arc::clone(&schema.name_key));
246        if let Some(&wire_id) = self.schema_ids.get(&key) {
247            // TODO: unify registry and schema_ids to avoid this error case
248            let Some(existing) = self.registry.get(wire_id) else {
249                return Err(io::Error::other(format!(
250                    "corrupted internal state. {wire_id:?} in schema_ids but not in registry."
251                )));
252            };
253            if *existing == *schema.entry {
254                return Ok(wire_id);
255            }
256            return Err(io::Error::new(
257                io::ErrorKind::InvalidInput,
258                format!(
259                    "schema already registered with different definition: {}",
260                    schema.name()
261                ),
262            ));
263        }
264        let id = self.registry.next_type_id();
265        codec::encode_schema(id, &schema.entry, &mut self.state.writer)?;
266        self.registry
267            .register(id, (*schema.entry).clone())
268            .expect("schema registration failed");
269        self.schema_ids.insert(key, id);
270        Ok(id)
271    }
272
273    /// Register a schema by name. Returns a [`Schema`] handle that can be
274    /// passed to [`write_event`](Self::write_event) (on this or any other
275    /// encoder).
276    ///
277    /// All schemas have timestamps. When writing events, the first element of
278    /// `values` must be `FieldValue::Varint(timestamp_ns)`. It is extracted and
279    /// encoded in the event header (not as a regular field).
280    ///
281    /// Eagerly writes the schema frame. Idempotent if the definition matches.
282    pub fn register_schema(
283        &mut self,
284        name: &str,
285        fields: Vec<crate::schema::FieldDef>,
286    ) -> io::Result<Schema> {
287        let schema = Schema::new(name, fields);
288        self.ensure_registered(&schema)?;
289        Ok(schema)
290    }
291
292    /// Write an event for a schema.
293    ///
294    /// The first element of `values` must be `FieldValue::Varint(timestamp_ns)`
295    /// — it is extracted and encoded in the event header, not as a regular
296    /// field. The remaining values must match the schema's field count.
297    ///
298    /// If this encoder hasn't seen `schema` before, it is auto-registered
299    /// (the schema frame is written before the event).
300    pub fn write_event(
301        &mut self,
302        schema: &Schema,
303        values: &[crate::types::FieldValue],
304    ) -> io::Result<()> {
305        use crate::types::FieldValue;
306
307        let type_id = self.ensure_registered(schema)?;
308        let expected_fields = schema.entry.fields.len();
309
310        let ts_ns = match values.first() {
311            Some(FieldValue::Varint(ns)) => *ns,
312            _ => {
313                return Err(io::Error::new(
314                    io::ErrorKind::InvalidInput,
315                    "first value must be FieldValue::Varint(timestamp_ns)",
316                ));
317            }
318        };
319        let field_values = &values[1..];
320
321        if field_values.len() != expected_fields {
322            return Err(io::Error::new(
323                io::ErrorKind::InvalidInput,
324                format!(
325                    "value count ({}) does not match schema field count ({}) for schema '{}'",
326                    field_values.len(),
327                    expected_fields,
328                    schema.name(),
329                ),
330            ));
331        }
332
333        let ts_delta = self.state.encode_timestamp_delta(ts_ns)?;
334        self.state.writer.write_all(&[codec::TAG_EVENT])?;
335        self.state.writer.write_all(&type_id.0.to_le_bytes())?;
336        codec::encode_u24_le(ts_delta, &mut self.state.writer)?;
337        let mut enc = EventEncoder::new(&mut self.state);
338        for (i, v) in field_values.iter().enumerate() {
339            enc.write_field_value(v, schema.entry.fields[i].field_type)?;
340        }
341        Ok(())
342    }
343
344    /// Write a derived TraceEvent. Auto-registers the schema on first call for this type.
345    /// Handles timestamp encoding: emits TimestampReset if needed, packs u24 delta in header.
346    pub fn write<T: TraceEvent + 'static>(&mut self, event: &T) -> io::Result<()> {
347        let key = SchemaKey::RustType(TypeId::of::<T>());
348        let tid = if let Some(&cached) = self.schema_ids.get(&key) {
349            cached
350        } else {
351            let entry = T::schema_entry();
352            let schema = Schema::new(&entry.name, entry.fields);
353            let id = self.ensure_registered(&schema)?;
354            self.schema_ids.insert(key, id);
355            id
356        };
357        let ts_ns = event.timestamp();
358        let ts_delta = self.state.encode_timestamp_delta(ts_ns)?;
359        self.state.writer.write_all(&[codec::TAG_EVENT])?;
360        self.state.writer.write_all(&tid.0.to_le_bytes())?;
361        codec::encode_u24_le(ts_delta, &mut self.state.writer)?;
362        let mut enc = EventEncoder::new(&mut self.state);
363        event.encode_fields(&mut enc)
364    }
365
366    /// Intern a string, emitting a pool frame if new. Returns an [`InternedString`] handle.
367    pub fn intern_string(&mut self, s: &str) -> io::Result<InternedString> {
368        if let Some(&id) = self.string_pool.get(s) {
369            return Ok(InternedString(id));
370        }
371        let id = self.next_pool_id;
372        self.next_pool_id += 1;
373        self.string_pool.insert(s.to_string(), id);
374        codec::encode_string_pool(
375            &[PoolEntry {
376                pool_id: id,
377                data: s.as_bytes().to_vec(),
378            }],
379            &mut self.state.writer,
380        )?;
381        Ok(InternedString(id))
382    }
383
384    pub fn write_string_pool(&mut self, entries: &[PoolEntry]) -> io::Result<()> {
385        codec::encode_string_pool(entries, &mut self.state.writer)
386    }
387
388    /// Flush the underlying writer.
389    pub fn flush(&mut self) -> io::Result<()> {
390        self.state.writer.flush()
391    }
392
393    /// Convert this encoder into a [`RawEncoder`] that only supports writing
394    /// pre-encoded bytes. The byte count is preserved so rotation decisions
395    /// remain correct.
396    ///
397    /// Use this after writing any structured data (headers, segment metadata)
398    /// to switch to a raw-only mode for appending pre-encoded batches.
399    pub fn into_raw_encoder(self) -> RawEncoder<W> {
400        RawEncoder {
401            writer: self.state.writer,
402        }
403    }
404}
405
406/// A write-only encoder that accepts pre-encoded bytes.
407///
408/// Created by [`Encoder::into_raw_encoder`] after the file header and any
409/// structured metadata have been written. Carries no schema registry, string
410/// pool, or timestamp state — it simply forwards bytes to the underlying
411/// writer while tracking the total byte count.
412pub struct RawEncoder<W> {
413    writer: CountingWriter<W>,
414}
415
416impl<W: Write> RawEncoder<W> {
417    /// Write pre-encoded bytes to the underlying writer.
418    pub fn write_raw(&mut self, bytes: &[u8]) -> io::Result<()> {
419        self.writer.write_all(bytes)
420    }
421
422    /// Total bytes written (including bytes written by the [`Encoder`] before
423    /// conversion).
424    pub fn bytes_written(&self) -> u64 {
425        self.writer.bytes_written()
426    }
427
428    /// Flush the underlying writer.
429    pub fn flush(&mut self) -> io::Result<()> {
430        self.writer.flush()
431    }
432
433    /// Consume the raw encoder and return the inner writer.
434    pub fn into_inner(self) -> W {
435        self.writer.into_inner()
436    }
437}
438
439impl Encoder<Vec<u8>> {
440    pub fn write_infallible<T: TraceEvent + 'static>(&mut self, event: &T) {
441        self.write(event).expect("writing to Vec<u8> is infallible")
442    }
443
444    pub fn intern_string_infallible(&mut self, s: &str) -> InternedString {
445        self.intern_string(s)
446            .expect("interning into Vec<u8> is infallible")
447    }
448
449    /// Resets the encoder to point to a new backing Vec returning the old one
450    pub fn reset_to_infallible(&mut self, data: Vec<u8>) -> Vec<u8> {
451        self.reset_to(data)
452            .expect("writing to Vec<u8> is infallible")
453    }
454}
455
456#[cfg(test)]
457mod tests {
458    use super::*;
459    use crate::schema::FieldDef;
460    use crate::types::{FieldType, FieldValue};
461
462    #[test]
463    fn encoder_writes_header() {
464        let enc = Encoder::new();
465        let data = enc.finish();
466        assert_eq!(&data[..5], &[0x54, 0x52, 0x43, 0x00, 1]);
467    }
468
469    #[test]
470    fn encoder_register_and_write_event() {
471        let mut enc = Encoder::new();
472        let schema = enc
473            .register_schema(
474                "Ev",
475                vec![FieldDef {
476                    name: "v".into(),
477                    field_type: FieldType::Varint,
478                }],
479            )
480            .unwrap();
481        enc.write_event(
482            &schema,
483            &[FieldValue::Varint(1_000), FieldValue::Varint(42)],
484        )
485        .unwrap();
486        let data = enc.finish();
487        assert!(data.len() > 5);
488    }
489
490    #[test]
491    fn idempotent_re_registration() {
492        let mut enc = Encoder::new();
493        let fields = vec![FieldDef {
494            name: "v".into(),
495            field_type: FieldType::Varint,
496        }];
497        let _s1 = enc.register_schema("Ev", fields.clone()).unwrap();
498        let _s2 = enc.register_schema("Ev", fields).unwrap();
499        // Both succeed — same schema, same name
500    }
501
502    #[test]
503    fn re_registration_different_schema_errors() {
504        let mut enc = Encoder::new();
505        enc.register_schema(
506            "Ev",
507            vec![FieldDef {
508                name: "v".into(),
509                field_type: FieldType::Varint,
510            }],
511        )
512        .unwrap();
513        let result = enc.register_schema(
514            "Ev",
515            vec![FieldDef {
516                name: "different".into(),
517                field_type: FieldType::Bool,
518            }],
519        );
520        assert!(result.is_err());
521    }
522
523    #[test]
524    fn schema_auto_registers_on_write() {
525        use crate::decoder::{DecodedFrame, Decoder};
526
527        // Create a schema without an encoder
528        let schema = Schema::new(
529            "Lazy",
530            vec![FieldDef {
531                name: "v".into(),
532                field_type: FieldType::Varint,
533            }],
534        );
535
536        // Write to an encoder that hasn't seen this schema — auto-registers
537        let mut enc = Encoder::new();
538        enc.write_event(
539            &schema,
540            &[FieldValue::Varint(1_000), FieldValue::Varint(42)],
541        )
542        .unwrap();
543
544        let bytes = enc.finish();
545        let mut dec = Decoder::new(&bytes).unwrap();
546        let frames = dec.decode_all();
547        assert!(matches!(&frames[0], DecodedFrame::Schema(s) if s.name == "Lazy"));
548        if let DecodedFrame::Event { values, .. } = &frames[1] {
549            assert_eq!(*values, vec![FieldValue::Varint(42)]);
550        } else {
551            panic!("expected event");
552        }
553    }
554
555    #[test]
556    fn schema_portable_across_encoders() {
557        use crate::decoder::{DecodedFrame, Decoder};
558
559        let mut enc1 = Encoder::new();
560        let schema = enc1
561            .register_schema(
562                "Shared",
563                vec![FieldDef {
564                    name: "v".into(),
565                    field_type: FieldType::Varint,
566                }],
567            )
568            .unwrap();
569        enc1.write_event(&schema, &[FieldValue::Varint(1_000), FieldValue::Varint(1)])
570            .unwrap();
571
572        // Pass the same Schema to a different encoder
573        let mut enc2 = Encoder::new();
574        enc2.write_event(&schema, &[FieldValue::Varint(2_000), FieldValue::Varint(2)])
575            .unwrap();
576
577        // Both encoders produce valid output
578        for (enc, expected_val) in [(enc1, 1u64), (enc2, 2u64)] {
579            let bytes = enc.finish();
580            let mut dec = Decoder::new(&bytes).unwrap();
581            let frames = dec.decode_all();
582            let event = frames
583                .iter()
584                .find(|f| matches!(f, DecodedFrame::Event { .. }))
585                .unwrap();
586            if let DecodedFrame::Event { values, .. } = event {
587                assert_eq!(values[0], FieldValue::Varint(expected_val));
588            }
589        }
590    }
591
592    #[test]
593    fn encoder_intern_string_deduplicates() {
594        let mut enc = Encoder::new();
595        let id1 = enc.intern_string("hello").unwrap();
596        let id2 = enc.intern_string("hello").unwrap();
597        let id3 = enc.intern_string("world").unwrap();
598        assert_eq!(id1, id2);
599        assert_ne!(id1, id3);
600    }
601
602    #[test]
603    fn timestamp_round_trip() {
604        use crate::decoder::{DecodedFrame, Decoder};
605
606        let mut enc = Encoder::new();
607        let schema = enc
608            .register_schema(
609                "TS",
610                vec![FieldDef {
611                    name: "v".into(),
612                    field_type: FieldType::Varint,
613                }],
614            )
615            .unwrap();
616
617        let ts1 = 100_000u64;
618        let ts2 = 50_000u64;
619        let ts3 = 200_000_000u64;
620        let ts4 = 100_000_000u64;
621        enc.write_event(&schema, &[FieldValue::Varint(ts1), FieldValue::Varint(1)])
622            .unwrap();
623        enc.write_event(&schema, &[FieldValue::Varint(ts2), FieldValue::Varint(2)])
624            .unwrap();
625        enc.write_event(&schema, &[FieldValue::Varint(ts3), FieldValue::Varint(3)])
626            .unwrap();
627        enc.write_event(&schema, &[FieldValue::Varint(ts4), FieldValue::Varint(4)])
628            .unwrap();
629
630        let bytes = enc.finish();
631        let mut dec = Decoder::new(&bytes).unwrap();
632        let events: Vec<_> = dec
633            .decode_all()
634            .into_iter()
635            .filter_map(|f| match f {
636                DecodedFrame::Event {
637                    timestamp_ns,
638                    values,
639                    ..
640                } => Some((timestamp_ns, values)),
641                _ => None,
642            })
643            .collect();
644
645        assert_eq!(events.len(), 4);
646        assert_eq!(events[0].0, Some(ts1));
647        assert_eq!(events[0].1, vec![FieldValue::Varint(1)]);
648        assert_eq!(events[1].0, Some(ts2));
649        assert_eq!(events[1].1, vec![FieldValue::Varint(2)]);
650        assert_eq!(events[2].0, Some(ts3));
651        assert_eq!(events[2].1, vec![FieldValue::Varint(3)]);
652        assert_eq!(events[3].0, Some(ts4));
653        assert_eq!(events[3].1, vec![FieldValue::Varint(4)]);
654    }
655
656    #[test]
657    fn encoder_new_to_writer() {
658        let mut buf = Vec::new();
659        let enc = Encoder::new_to(&mut buf).unwrap();
660        drop(enc);
661        assert!(buf.len() >= 5);
662        assert_eq!(&buf[..5], &[0x54, 0x52, 0x43, 0x00, 1]);
663    }
664
665    #[test]
666    fn decoder_into_encoder_appends_without_header() {
667        use crate::decoder::{DecodedFrame, Decoder};
668
669        // Create a trace with a header, a schema, and an event
670        let mut enc = Encoder::new();
671        let schema = enc
672            .register_schema(
673                "Ev",
674                vec![FieldDef {
675                    name: "v".into(),
676                    field_type: FieldType::Varint,
677                }],
678            )
679            .unwrap();
680        enc.write_event(&schema, &[FieldValue::Varint(1_000), FieldValue::Varint(1)])
681            .unwrap();
682        let base = enc.finish();
683
684        // Decode all frames, then convert into an encoder that appends to output
685        let mut decoder = Decoder::new(&base).unwrap();
686        while decoder.next_frame_ref().ok().flatten().is_some() {}
687        let mut output = Vec::new();
688        let mut ext = decoder.into_encoder(&mut output);
689        // Schema "Ev" is already known — no duplicate schema frame emitted
690        ext.write_event(&schema, &[FieldValue::Varint(2_000), FieldValue::Varint(2)])
691            .unwrap();
692        drop(ext);
693
694        // Concatenate and decode
695        let mut combined = base.clone();
696        combined.extend_from_slice(&output);
697        let mut dec = Decoder::new(&combined).unwrap();
698        let events: Vec<_> = dec
699            .decode_all()
700            .into_iter()
701            .filter_map(|f| match f {
702                DecodedFrame::Event {
703                    timestamp_ns,
704                    values,
705                    ..
706                } => Some((timestamp_ns, values)),
707                _ => None,
708            })
709            .collect();
710        assert_eq!(events.len(), 2);
711        assert_eq!(events[0].0, Some(1_000));
712        assert_eq!(events[1].0, Some(2_000));
713    }
714
715    #[test]
716    fn decoder_into_encoder_deduplicates_interned_strings() {
717        use crate::decoder::{DecodedFrame, Decoder};
718
719        // Create a trace with an interned string
720        let mut enc = Encoder::new();
721        let id1 = enc.intern_string("hello").unwrap();
722        let base = enc.finish();
723
724        // Decode all frames, then convert into an encoder
725        let mut decoder = Decoder::new(&base).unwrap();
726        while decoder.next_frame_ref().ok().flatten().is_some() {}
727        let mut output = Vec::new();
728        let mut ext = decoder.into_encoder(&mut output);
729        // "hello" is already interned, should reuse the same ID
730        let id2 = ext.intern_string("hello").unwrap();
731        let id3 = ext.intern_string("world").unwrap();
732        drop(ext);
733
734        assert_eq!(id1, id2, "existing string should reuse pool ID");
735        assert_ne!(id2, id3);
736
737        // "hello" should not produce a new StringPool frame; "world" should
738        let mut combined = base.clone();
739        combined.extend_from_slice(&output);
740        let mut dec = Decoder::new(&combined).unwrap();
741        let frames = dec.decode_all();
742        let pool_frames: Vec<_> = frames
743            .iter()
744            .filter(|f| matches!(f, DecodedFrame::StringPool(_)))
745            .collect();
746        // One from the base trace ("hello"), one from extend ("world")
747        assert_eq!(pool_frames.len(), 2);
748    }
749
750    #[test]
751    fn register_and_write() {
752        use crate::decoder::{DecodedFrame, Decoder};
753
754        let mut enc = Encoder::new();
755        let schema = enc
756            .register_schema(
757                "MyEvent",
758                vec![
759                    FieldDef {
760                        name: "count".into(),
761                        field_type: FieldType::Varint,
762                    },
763                    FieldDef {
764                        name: "name".into(),
765                        field_type: FieldType::String,
766                    },
767                ],
768            )
769            .unwrap();
770
771        enc.write_event(
772            &schema,
773            &[
774                FieldValue::Varint(1_000_000),
775                FieldValue::Varint(42),
776                FieldValue::String("hello".into()),
777            ],
778        )
779        .unwrap();
780
781        let bytes = enc.finish();
782        let mut dec = Decoder::new(&bytes).unwrap();
783        let frames = dec.decode_all();
784        let events: Vec<_> = frames
785            .into_iter()
786            .filter_map(|f| match f {
787                DecodedFrame::Event {
788                    timestamp_ns,
789                    values,
790                    ..
791                } => Some((timestamp_ns, values)),
792                _ => None,
793            })
794            .collect();
795        assert_eq!(events.len(), 1);
796        assert_eq!(events[0].0, Some(1_000_000));
797        assert_eq!(events[0].1[0], FieldValue::Varint(42));
798        assert_eq!(events[0].1[1], FieldValue::String("hello".into()));
799    }
800
801    #[test]
802    fn register_conflict_errors() {
803        let mut enc = Encoder::new();
804        enc.register_schema(
805            "Ev",
806            vec![FieldDef {
807                name: "v".into(),
808                field_type: FieldType::Varint,
809            }],
810        )
811        .unwrap();
812        let result = enc.register_schema(
813            "Ev",
814            vec![FieldDef {
815                name: "other".into(),
816                field_type: FieldType::Bool,
817            }],
818        );
819        assert!(result.is_err());
820    }
821
822    #[test]
823    fn write_wrong_field_count_errors() {
824        let mut enc = Encoder::new();
825        let schema = enc
826            .register_schema(
827                "Ev",
828                vec![FieldDef {
829                    name: "v".into(),
830                    field_type: FieldType::Varint,
831                }],
832            )
833            .unwrap();
834        // Pass 3 values (ts + 2 fields) for a 1-field schema
835        let result = enc.write_event(
836            &schema,
837            &[
838                FieldValue::Varint(0),
839                FieldValue::Varint(1),
840                FieldValue::Varint(2),
841            ],
842        );
843        assert!(result.is_err());
844    }
845
846    /// Verify that the encoder advances the timestamp base after each event,
847    /// producing inter-event deltas rather than base-relative deltas.
848    #[test]
849    fn timestamp_base_advances_per_event() {
850        use crate::decoder::{DecodedFrame, Decoder};
851
852        let mut enc = Encoder::new();
853        let schema = enc
854            .register_schema(
855                "Ev",
856                vec![FieldDef {
857                    name: "v".into(),
858                    field_type: FieldType::Varint,
859                }],
860            )
861            .unwrap();
862
863        let ts1 = 12_000_000u64;
864        let ts2 = 24_000_000u64;
865        enc.write_event(&schema, &[FieldValue::Varint(ts1), FieldValue::Varint(1)])
866            .unwrap();
867        enc.write_event(&schema, &[FieldValue::Varint(ts2), FieldValue::Varint(2)])
868            .unwrap();
869
870        let bytes = enc.finish();
871
872        let reset_count = bytes.iter().filter(|&&b| b == 0x05).count();
873        assert_eq!(
874            reset_count, 0,
875            "base should advance per event, avoiding unnecessary resets"
876        );
877
878        let mut dec = Decoder::new(&bytes).unwrap();
879        let events: Vec<_> = dec
880            .decode_all()
881            .into_iter()
882            .filter_map(|f| match f {
883                DecodedFrame::Event { timestamp_ns, .. } => timestamp_ns,
884                _ => None,
885            })
886            .collect();
887        assert_eq!(events, vec![ts1, ts2]);
888    }
889
890    #[test]
891    fn reset_to_preserves_capacity() {
892        let mut enc = Encoder::new();
893        for i in 0..100 {
894            enc.intern_string(&format!("string_{}", i)).unwrap();
895        }
896        let cap_before = enc.string_pool.capacity();
897        let _bytes = enc.reset_to(Vec::new());
898        let cap_after = enc.string_pool.capacity();
899        assert_eq!(
900            cap_before, cap_after,
901            "string_pool capacity should be preserved after reset_to"
902        );
903    }
904
905    #[test]
906    fn reset_to_returns_old_data_and_clears_state() {
907        use crate::decoder::{DecodedFrame, Decoder};
908
909        let mut enc = Encoder::new();
910        let schema = enc
911            .register_schema(
912                "Ev",
913                vec![FieldDef {
914                    name: "v".into(),
915                    field_type: FieldType::Varint,
916                }],
917            )
918            .unwrap();
919        enc.write_event(
920            &schema,
921            &[FieldValue::Varint(1_000), FieldValue::Varint(42)],
922        )
923        .unwrap();
924        let _s = enc.intern_string("hello").unwrap();
925
926        let old_bytes_written = enc.bytes_written();
927        assert!(old_bytes_written > 0);
928
929        // --- reset ---
930        let old = enc.reset_to_infallible(Vec::new());
931
932        // Invariant 1: old writer contains the data we wrote (decodable)
933        let mut dec = Decoder::new(&old).unwrap();
934        let frames = dec.decode_all();
935        assert!(frames.iter().any(|f| matches!(f, DecodedFrame::Schema(_))));
936        assert!(
937            frames
938                .iter()
939                .any(|f| matches!(f, DecodedFrame::Event { .. }))
940        );
941        assert!(
942            frames
943                .iter()
944                .any(|f| matches!(f, DecodedFrame::StringPool(_)))
945        );
946
947        // Invariant 2: bytes_written resets to just the header size
948        assert!(
949            enc.bytes_written() < old_bytes_written,
950            "bytes_written should reset (got {} vs old {})",
951            enc.bytes_written(),
952            old_bytes_written
953        );
954
955        // Invariant 3: schemas are cleared — same schema must re-register
956        // (write_event auto-registers, so we verify a new schema frame appears)
957        enc.write_event(
958            &schema,
959            &[FieldValue::Varint(2_000), FieldValue::Varint(99)],
960        )
961        .unwrap();
962
963        // Invariant 4: string pool is cleared — re-interning emits a new pool frame
964        let _s2 = enc.intern_string("hello").unwrap();
965
966        // Invariant 5: new output is a valid standalone trace
967        let new_bytes = enc.reset_to_infallible(Vec::new());
968        let mut dec2 = Decoder::new(&new_bytes).unwrap();
969        let new_frames = dec2.decode_all();
970        // Must have its own schema definition (not relying on old encoder state)
971        assert!(
972            new_frames
973                .iter()
974                .any(|f| matches!(f, DecodedFrame::Schema(s) if s.name == "Ev")),
975            "new trace must contain schema definition"
976        );
977        // Must have its own string pool entry
978        assert!(
979            new_frames
980                .iter()
981                .any(|f| matches!(f, DecodedFrame::StringPool(_))),
982            "new trace must contain string pool"
983        );
984        // Event must decode with correct timestamp (timestamp_base was reset)
985        let event = new_frames
986            .iter()
987            .find_map(|f| match f {
988                DecodedFrame::Event {
989                    timestamp_ns,
990                    values,
991                    ..
992                } => Some((timestamp_ns, values)),
993                _ => None,
994            })
995            .expect("new trace must contain event");
996        assert_eq!(*event.0, Some(2_000));
997        assert_eq!(event.1[0], FieldValue::Varint(99));
998    }
999
1000    #[test]
1001    fn into_raw_encoder_preserves_byte_count() {
1002        let mut enc = Encoder::new();
1003        let schema = enc
1004            .register_schema(
1005                "Ev",
1006                vec![FieldDef {
1007                    name: "v".into(),
1008                    field_type: FieldType::Varint,
1009                }],
1010            )
1011            .unwrap();
1012        enc.write_event(
1013            &schema,
1014            &[FieldValue::Varint(1_000), FieldValue::Varint(42)],
1015        )
1016        .unwrap();
1017
1018        let bytes_before = enc.bytes_written();
1019        assert!(bytes_before > 0);
1020
1021        let raw = enc.into_raw_encoder();
1022        assert_eq!(
1023            raw.bytes_written(),
1024            bytes_before,
1025            "byte count must be preserved across conversion"
1026        );
1027    }
1028
1029    #[test]
1030    fn raw_encoder_write_raw_and_bytes_written() {
1031        let enc = Encoder::new();
1032        let initial = enc.bytes_written();
1033        let mut raw = enc.into_raw_encoder();
1034
1035        let payload = [0xAA; 100];
1036        raw.write_raw(&payload).unwrap();
1037
1038        assert_eq!(
1039            raw.bytes_written(),
1040            initial + payload.len() as u64,
1041            "bytes_written must include raw payload"
1042        );
1043    }
1044
1045    #[test]
1046    fn raw_encoder_into_inner_returns_all_data() {
1047        use crate::decoder::{DecodedFrame, Decoder};
1048
1049        // Write a structured event via Encoder, then append a raw batch
1050        // via RawEncoder, and verify the combined output decodes correctly.
1051        let mut enc = Encoder::new();
1052        let schema = enc
1053            .register_schema(
1054                "Ev",
1055                vec![FieldDef {
1056                    name: "v".into(),
1057                    field_type: FieldType::Varint,
1058                }],
1059            )
1060            .unwrap();
1061        enc.write_event(&schema, &[FieldValue::Varint(1_000), FieldValue::Varint(1)])
1062            .unwrap();
1063
1064        // Build a raw batch with the same schema
1065        let raw_batch = {
1066            let mut batch_enc = Encoder::new();
1067            batch_enc
1068                .write_event(&schema, &[FieldValue::Varint(2_000), FieldValue::Varint(2)])
1069                .unwrap();
1070            batch_enc.finish()
1071        };
1072
1073        let mut raw = enc.into_raw_encoder();
1074        raw.write_raw(&raw_batch).unwrap();
1075        let combined = raw.into_inner();
1076
1077        let mut dec = Decoder::new(&combined).unwrap();
1078        let events: Vec<_> = dec
1079            .decode_all()
1080            .into_iter()
1081            .filter_map(|f| match f {
1082                DecodedFrame::Event {
1083                    timestamp_ns,
1084                    values,
1085                    ..
1086                } => Some((timestamp_ns, values)),
1087                _ => None,
1088            })
1089            .collect();
1090
1091        assert_eq!(events.len(), 2);
1092        assert_eq!(events[0].0, Some(1_000));
1093        assert_eq!(events[0].1, vec![FieldValue::Varint(1)]);
1094        assert_eq!(events[1].0, Some(2_000));
1095        assert_eq!(events[1].1, vec![FieldValue::Varint(2)]);
1096    }
1097}