Skip to main content

dial9_trace_format/
encoder.rs

1//! High-level encoder for writing trace files.
2//!
3//! [`Encoder`] writes the file header, registers schemas, interns strings, and
4//! encodes events with delta-compressed timestamps. It is the primary entry
5//! point for producing trace data.
6
7use crate::TraceEvent;
8use crate::codec::{self, PoolEntry, WireTypeId};
9use crate::schema::{SchemaEntry, SchemaRegistry};
10use crate::types::{CountingWriter, EncodeState, EventEncoder, InternedString};
11use std::any::TypeId;
12use std::collections::HashMap;
13use std::hash::{BuildHasherDefault, Hasher};
14use std::io::{self, Write};
15use std::sync::Arc;
16
17/// A fast, non-cryptographic hasher using FxHash's multiply-shift strategy.
18///
19/// For HashMap keys that are already well-distributed (TypeId, Arc<str>), this
20/// avoids hash collisions.
21#[derive(Default)]
22pub(crate) struct FxHasher(u64);
23
24impl Hasher for FxHasher {
25    #[inline]
26    fn write(&mut self, bytes: &[u8]) {
27        for &b in bytes {
28            self.0 = (self.0.rotate_left(5) ^ b as u64).wrapping_mul(0x517cc1b727220a95);
29        }
30    }
31
32    #[inline]
33    fn write_u64(&mut self, i: u64) {
34        self.0 = (self.0.rotate_left(5) ^ i).wrapping_mul(0x517cc1b727220a95);
35    }
36
37    #[inline]
38    fn write_u32(&mut self, i: u32) {
39        self.write_u64(i as u64)
40    }
41
42    #[inline]
43    fn write_u16(&mut self, i: u16) {
44        self.write_u64(i as u64)
45    }
46
47    #[inline]
48    fn write_usize(&mut self, i: usize) {
49        self.write_u64(i as u64)
50    }
51
52    #[inline]
53    fn write_u128(&mut self, i: u128) {
54        self.write_u64(i as u64);
55        self.write_u64((i >> 64) as u64);
56    }
57
58    #[inline]
59    fn finish(&self) -> u64 {
60        self.0
61    }
62}
63
64pub(crate) type FxBuildHasher = BuildHasherDefault<FxHasher>;
65pub(crate) type FxHashMap<K, V> = HashMap<K, V, FxBuildHasher>;
66
67/// A schema handle returned by [`Encoder::register_schema`] or created via
68/// [`Schema::new`].
69///
70/// Carries the full schema definition (name + fields) so it can auto-register
71/// itself with any encoder on first use. This means a `Schema` created on one
72/// encoder can be passed to a different encoder and it will just work.
73///
74/// `Schema` is cheap to clone (internally `Arc`-backed).
75#[derive(Clone, Debug)]
76pub struct Schema {
77    pub(crate) entry: Arc<SchemaEntry>,
78    /// Pre-computed `Arc<str>` of the schema name, used as a cheap HashMap key
79    /// (clone is a pointer bump instead of a String allocation).
80    name_key: Arc<str>,
81}
82
83impl Schema {
84    /// Create a schema handle without an encoder.
85    ///
86    /// The schema will be lazily registered the first time it is passed to
87    /// [`Encoder::write_event`].
88    pub fn new(name: &str, fields: Vec<crate::schema::FieldDef>) -> Self {
89        let name_key: Arc<str> = Arc::from(name);
90        Self {
91            entry: Arc::new(SchemaEntry {
92                name: name.to_string(),
93                has_timestamp: true,
94                fields,
95            }),
96            name_key,
97        }
98    }
99
100    /// Schema name.
101    pub fn name(&self) -> &str {
102        &self.entry.name
103    }
104
105    /// Schema field definitions.
106    pub fn fields(&self) -> &[crate::schema::FieldDef] {
107        &self.entry.fields
108    }
109}
110
111/// Key for schema lookup — either by name (manual registration) or by Rust
112/// `TypeId` (derive macro path).
113#[derive(Clone, PartialEq, Eq, Hash)]
114enum SchemaKey {
115    Name(Arc<str>),
116    RustType(TypeId),
117}
118
119/// Trace file encoder.
120///
121/// Writes the binary file header, registers event schemas, interns strings
122/// into a pool, and encodes events with delta-compressed timestamps.
123///
124/// The default type parameter (`Vec<u8>`) buffers everything in memory;
125/// use [`Encoder::new_to`] to write to an arbitrary [`Write`] sink.
126pub struct Encoder<W: Write = Vec<u8>> {
127    state: EncodeState<W>,
128    registry: SchemaRegistry,
129    string_pool: FxHashMap<String, u32>,
130    next_pool_id: u32,
131    schema_ids: FxHashMap<SchemaKey, WireTypeId>,
132}
133
134impl Default for Encoder<Vec<u8>> {
135    fn default() -> Self {
136        Self::new()
137    }
138}
139
140impl Encoder<Vec<u8>> {
141    pub fn new() -> Self {
142        let mut buf = Vec::new();
143        codec::encode_header(&mut buf).expect("Vec::write_all cannot fail");
144        Self {
145            state: EncodeState::new(buf),
146            registry: SchemaRegistry::new(),
147            string_pool: FxHashMap::default(),
148            next_pool_id: 0,
149            schema_ids: FxHashMap::default(),
150        }
151    }
152
153    /// Consume the encoder and return the encoded bytes.
154    pub fn finish(self) -> Vec<u8> {
155        self.state.writer.into_inner()
156    }
157}
158
159impl<W: Write> Encoder<W> {
160    /// Create an encoder that writes to an arbitrary writer.
161    /// Writes the file header immediately.
162    pub fn new_to(mut writer: W) -> io::Result<Self> {
163        codec::encode_header(&mut writer)?;
164        Ok(Self {
165            state: EncodeState::new(writer),
166            registry: SchemaRegistry::new(),
167            string_pool: FxHashMap::default(),
168            next_pool_id: 0,
169            schema_ids: FxHashMap::default(),
170        })
171    }
172
173    /// Create an encoder seeded from decoded state. Used by
174    /// [`Decoder::into_encoder`](crate::decoder::Decoder::into_encoder).
175    pub(crate) fn from_decoder(
176        mut registry: SchemaRegistry,
177        string_pool: crate::decoder::StringPool,
178        timestamp_base_ns: u64,
179        writer: W,
180    ) -> Self {
181        let mut pool = FxHashMap::default();
182        let mut next_pool_id: u32 = 0;
183        for (id, value) in string_pool.0.into_iter() {
184            pool.insert(value, id.raw_id());
185            if id.raw_id() >= next_pool_id {
186                next_pool_id = id.raw_id() + 1;
187            }
188        }
189
190        let mut schema_ids = FxHashMap::default();
191        for (wire_id, entry) in registry.entries() {
192            schema_ids.insert(SchemaKey::Name(Arc::from(entry.name.as_str())), wire_id);
193        }
194        registry.sync_next_id();
195
196        let mut state = EncodeState::new(writer);
197        state.set_ts_base_unchecked(timestamp_base_ns);
198
199        Self {
200            state,
201            registry,
202            string_pool: pool,
203            next_pool_id,
204            schema_ids,
205        }
206    }
207
208    /// Consume the encoder and return the inner writer.
209    pub fn into_inner(self) -> W {
210        self.state.writer.into_inner()
211    }
212
213    /// Borrow the inner writer.
214    pub fn as_inner(&self) -> &W {
215        self.state.writer.inner()
216    }
217
218    /// Total bytes written through this encoder (including the file header).
219    pub fn bytes_written(&self) -> u64 {
220        self.state.writer.bytes_written()
221    }
222
223    /// Reset the encoder to a new writer, preserving internal allocations.
224    /// Returns the old writer. Writes a file header to the new writer.
225    pub fn reset_to(&mut self, mut new_writer: W) -> io::Result<W> {
226        codec::encode_header(&mut new_writer)?;
227        self.string_pool.clear();
228        self.next_pool_id = 0;
229        self.registry.clear();
230        self.schema_ids.clear();
231        // creating a new EncodeState resets the timestamp delta
232        let old_state = std::mem::replace(&mut self.state, EncodeState::new(new_writer));
233        Ok(old_state.writer.into_inner())
234    }
235
236    /// Ensure a schema is registered with this encoder. Returns the wire type
237    /// ID for this encoder's output stream.
238    ///
239    /// Idempotent if the schema matches. Errors if a different schema was
240    /// already registered under the same name.
241    fn ensure_registered(&mut self, schema: &Schema) -> io::Result<WireTypeId> {
242        let key = SchemaKey::Name(Arc::clone(&schema.name_key));
243        if let Some(&wire_id) = self.schema_ids.get(&key) {
244            // TODO: unify registry and schema_ids to avoid this error case
245            let Some(existing) = self.registry.get(wire_id) else {
246                return Err(io::Error::other(format!(
247                    "corrupted internal state. {wire_id:?} in schema_ids but not in registry."
248                )));
249            };
250            if *existing == *schema.entry {
251                return Ok(wire_id);
252            }
253            return Err(io::Error::new(
254                io::ErrorKind::InvalidInput,
255                format!(
256                    "schema already registered with different definition: {}",
257                    schema.name()
258                ),
259            ));
260        }
261        let id = self.registry.next_type_id();
262        codec::encode_schema(id, &schema.entry, &mut self.state.writer)?;
263        self.registry
264            .register(id, (*schema.entry).clone())
265            .expect("schema registration failed");
266        self.schema_ids.insert(key, id);
267        Ok(id)
268    }
269
270    /// Register a schema by name. Returns a [`Schema`] handle that can be
271    /// passed to [`write_event`](Self::write_event) (on this or any other
272    /// encoder).
273    ///
274    /// All schemas have timestamps. When writing events, the first element of
275    /// `values` must be `FieldValue::Varint(timestamp_ns)`. It is extracted and
276    /// encoded in the event header (not as a regular field).
277    ///
278    /// Eagerly writes the schema frame. Idempotent if the definition matches.
279    pub fn register_schema(
280        &mut self,
281        name: &str,
282        fields: Vec<crate::schema::FieldDef>,
283    ) -> io::Result<Schema> {
284        let schema = Schema::new(name, fields);
285        self.ensure_registered(&schema)?;
286        Ok(schema)
287    }
288
289    /// Write an event for a schema.
290    ///
291    /// The first element of `values` must be `FieldValue::Varint(timestamp_ns)`
292    /// — it is extracted and encoded in the event header, not as a regular
293    /// field. The remaining values must match the schema's field count.
294    ///
295    /// If this encoder hasn't seen `schema` before, it is auto-registered
296    /// (the schema frame is written before the event).
297    pub fn write_event(
298        &mut self,
299        schema: &Schema,
300        values: &[crate::types::FieldValue],
301    ) -> io::Result<()> {
302        use crate::types::FieldValue;
303
304        let type_id = self.ensure_registered(schema)?;
305        let expected_fields = schema.entry.fields.len();
306
307        let ts_ns = match values.first() {
308            Some(FieldValue::Varint(ns)) => *ns,
309            _ => {
310                return Err(io::Error::new(
311                    io::ErrorKind::InvalidInput,
312                    "first value must be FieldValue::Varint(timestamp_ns)",
313                ));
314            }
315        };
316        let field_values = &values[1..];
317
318        if field_values.len() != expected_fields {
319            return Err(io::Error::new(
320                io::ErrorKind::InvalidInput,
321                format!(
322                    "value count ({}) does not match schema field count ({}) for schema '{}'",
323                    field_values.len(),
324                    expected_fields,
325                    schema.name(),
326                ),
327            ));
328        }
329
330        let ts_delta = self.state.encode_timestamp_delta(ts_ns)?;
331        self.state.writer.write_all(&[codec::TAG_EVENT])?;
332        self.state.writer.write_all(&type_id.0.to_le_bytes())?;
333        codec::encode_u24_le(ts_delta, &mut self.state.writer)?;
334        let mut enc = EventEncoder::new(&mut self.state);
335        for (i, v) in field_values.iter().enumerate() {
336            enc.write_field_value(v, schema.entry.fields[i].field_type)?;
337        }
338        Ok(())
339    }
340
341    /// Write a derived TraceEvent. Auto-registers the schema on first call for this type.
342    /// Handles timestamp encoding: emits TimestampReset if needed, packs u24 delta in header.
343    pub fn write<T: TraceEvent + 'static>(&mut self, event: &T) -> io::Result<()> {
344        let key = SchemaKey::RustType(TypeId::of::<T>());
345        let tid = if let Some(&cached) = self.schema_ids.get(&key) {
346            cached
347        } else {
348            let entry = T::schema_entry();
349            let schema = Schema::new(&entry.name, entry.fields);
350            let id = self.ensure_registered(&schema)?;
351            self.schema_ids.insert(key, id);
352            id
353        };
354        let ts_ns = event.timestamp();
355        let ts_delta = self.state.encode_timestamp_delta(ts_ns)?;
356        self.state.writer.write_all(&[codec::TAG_EVENT])?;
357        self.state.writer.write_all(&tid.0.to_le_bytes())?;
358        codec::encode_u24_le(ts_delta, &mut self.state.writer)?;
359        let mut enc = EventEncoder::new(&mut self.state);
360        event.encode_fields(&mut enc)
361    }
362
363    /// Intern a string, emitting a pool frame if new. Returns an [`InternedString`] handle.
364    pub fn intern_string(&mut self, s: &str) -> io::Result<InternedString> {
365        if let Some(&id) = self.string_pool.get(s) {
366            return Ok(InternedString(id));
367        }
368        let id = self.next_pool_id;
369        self.next_pool_id += 1;
370        self.string_pool.insert(s.to_string(), id);
371        codec::encode_string_pool(
372            &[PoolEntry {
373                pool_id: id,
374                data: s.as_bytes().to_vec(),
375            }],
376            &mut self.state.writer,
377        )?;
378        Ok(InternedString(id))
379    }
380
381    pub fn write_string_pool(&mut self, entries: &[PoolEntry]) -> io::Result<()> {
382        codec::encode_string_pool(entries, &mut self.state.writer)
383    }
384
385    /// Flush the underlying writer.
386    pub fn flush(&mut self) -> io::Result<()> {
387        self.state.writer.flush()
388    }
389
390    /// Convert this encoder into a [`RawEncoder`] that only supports writing
391    /// pre-encoded bytes. The byte count is preserved so rotation decisions
392    /// remain correct.
393    ///
394    /// Use this after writing any structured data (headers, segment metadata)
395    /// to switch to a raw-only mode for appending pre-encoded batches.
396    pub fn into_raw_encoder(self) -> RawEncoder<W> {
397        RawEncoder {
398            writer: self.state.writer,
399        }
400    }
401}
402
403/// A write-only encoder that accepts pre-encoded bytes.
404///
405/// Created by [`Encoder::into_raw_encoder`] after the file header and any
406/// structured metadata have been written. Carries no schema registry, string
407/// pool, or timestamp state — it simply forwards bytes to the underlying
408/// writer while tracking the total byte count.
409pub struct RawEncoder<W> {
410    writer: CountingWriter<W>,
411}
412
413impl<W: Write> RawEncoder<W> {
414    /// Write pre-encoded bytes to the underlying writer.
415    pub fn write_raw(&mut self, bytes: &[u8]) -> io::Result<()> {
416        self.writer.write_all(bytes)
417    }
418
419    /// Total bytes written (including bytes written by the [`Encoder`] before
420    /// conversion).
421    pub fn bytes_written(&self) -> u64 {
422        self.writer.bytes_written()
423    }
424
425    /// Flush the underlying writer.
426    pub fn flush(&mut self) -> io::Result<()> {
427        self.writer.flush()
428    }
429
430    /// Consume the raw encoder and return the inner writer.
431    pub fn into_inner(self) -> W {
432        self.writer.into_inner()
433    }
434}
435
436impl Encoder<Vec<u8>> {
437    pub fn write_infallible<T: TraceEvent + 'static>(&mut self, event: &T) {
438        self.write(event).expect("writing to Vec<u8> is infallible")
439    }
440
441    pub fn intern_string_infallible(&mut self, s: &str) -> InternedString {
442        self.intern_string(s)
443            .expect("interning into Vec<u8> is infallible")
444    }
445
446    /// Resets the encoder to point to a new backing Vec returning the old one
447    pub fn reset_to_infallible(&mut self, data: Vec<u8>) -> Vec<u8> {
448        self.reset_to(data)
449            .expect("writing to Vec<u8> is infallible")
450    }
451}
452
453#[cfg(test)]
454mod tests {
455    use super::*;
456    use crate::schema::FieldDef;
457    use crate::types::{FieldType, FieldValue};
458
459    #[test]
460    fn encoder_writes_header() {
461        let enc = Encoder::new();
462        let data = enc.finish();
463        assert_eq!(&data[..5], &[0x54, 0x52, 0x43, 0x00, 1]);
464    }
465
466    #[test]
467    fn encoder_register_and_write_event() {
468        let mut enc = Encoder::new();
469        let schema = enc
470            .register_schema(
471                "Ev",
472                vec![FieldDef {
473                    name: "v".into(),
474                    field_type: FieldType::Varint,
475                }],
476            )
477            .unwrap();
478        enc.write_event(
479            &schema,
480            &[FieldValue::Varint(1_000), FieldValue::Varint(42)],
481        )
482        .unwrap();
483        let data = enc.finish();
484        assert!(data.len() > 5);
485    }
486
487    #[test]
488    fn idempotent_re_registration() {
489        let mut enc = Encoder::new();
490        let fields = vec![FieldDef {
491            name: "v".into(),
492            field_type: FieldType::Varint,
493        }];
494        let _s1 = enc.register_schema("Ev", fields.clone()).unwrap();
495        let _s2 = enc.register_schema("Ev", fields).unwrap();
496        // Both succeed — same schema, same name
497    }
498
499    #[test]
500    fn re_registration_different_schema_errors() {
501        let mut enc = Encoder::new();
502        enc.register_schema(
503            "Ev",
504            vec![FieldDef {
505                name: "v".into(),
506                field_type: FieldType::Varint,
507            }],
508        )
509        .unwrap();
510        let result = enc.register_schema(
511            "Ev",
512            vec![FieldDef {
513                name: "different".into(),
514                field_type: FieldType::Bool,
515            }],
516        );
517        assert!(result.is_err());
518    }
519
520    #[test]
521    fn schema_auto_registers_on_write() {
522        use crate::decoder::{DecodedFrame, Decoder};
523
524        // Create a schema without an encoder
525        let schema = Schema::new(
526            "Lazy",
527            vec![FieldDef {
528                name: "v".into(),
529                field_type: FieldType::Varint,
530            }],
531        );
532
533        // Write to an encoder that hasn't seen this schema — auto-registers
534        let mut enc = Encoder::new();
535        enc.write_event(
536            &schema,
537            &[FieldValue::Varint(1_000), FieldValue::Varint(42)],
538        )
539        .unwrap();
540
541        let bytes = enc.finish();
542        let mut dec = Decoder::new(&bytes).unwrap();
543        let frames = dec.decode_all();
544        assert!(matches!(&frames[0], DecodedFrame::Schema(s) if s.name == "Lazy"));
545        if let DecodedFrame::Event { values, .. } = &frames[1] {
546            assert_eq!(*values, vec![FieldValue::Varint(42)]);
547        } else {
548            panic!("expected event");
549        }
550    }
551
552    #[test]
553    fn schema_portable_across_encoders() {
554        use crate::decoder::{DecodedFrame, Decoder};
555
556        let mut enc1 = Encoder::new();
557        let schema = enc1
558            .register_schema(
559                "Shared",
560                vec![FieldDef {
561                    name: "v".into(),
562                    field_type: FieldType::Varint,
563                }],
564            )
565            .unwrap();
566        enc1.write_event(&schema, &[FieldValue::Varint(1_000), FieldValue::Varint(1)])
567            .unwrap();
568
569        // Pass the same Schema to a different encoder
570        let mut enc2 = Encoder::new();
571        enc2.write_event(&schema, &[FieldValue::Varint(2_000), FieldValue::Varint(2)])
572            .unwrap();
573
574        // Both encoders produce valid output
575        for (enc, expected_val) in [(enc1, 1u64), (enc2, 2u64)] {
576            let bytes = enc.finish();
577            let mut dec = Decoder::new(&bytes).unwrap();
578            let frames = dec.decode_all();
579            let event = frames
580                .iter()
581                .find(|f| matches!(f, DecodedFrame::Event { .. }))
582                .unwrap();
583            if let DecodedFrame::Event { values, .. } = event {
584                assert_eq!(values[0], FieldValue::Varint(expected_val));
585            }
586        }
587    }
588
589    #[test]
590    fn encoder_intern_string_deduplicates() {
591        let mut enc = Encoder::new();
592        let id1 = enc.intern_string("hello").unwrap();
593        let id2 = enc.intern_string("hello").unwrap();
594        let id3 = enc.intern_string("world").unwrap();
595        assert_eq!(id1, id2);
596        assert_ne!(id1, id3);
597    }
598
599    #[test]
600    fn timestamp_round_trip() {
601        use crate::decoder::{DecodedFrame, Decoder};
602
603        let mut enc = Encoder::new();
604        let schema = enc
605            .register_schema(
606                "TS",
607                vec![FieldDef {
608                    name: "v".into(),
609                    field_type: FieldType::Varint,
610                }],
611            )
612            .unwrap();
613
614        let ts1 = 100_000u64;
615        let ts2 = 50_000u64;
616        let ts3 = 200_000_000u64;
617        let ts4 = 100_000_000u64;
618        enc.write_event(&schema, &[FieldValue::Varint(ts1), FieldValue::Varint(1)])
619            .unwrap();
620        enc.write_event(&schema, &[FieldValue::Varint(ts2), FieldValue::Varint(2)])
621            .unwrap();
622        enc.write_event(&schema, &[FieldValue::Varint(ts3), FieldValue::Varint(3)])
623            .unwrap();
624        enc.write_event(&schema, &[FieldValue::Varint(ts4), FieldValue::Varint(4)])
625            .unwrap();
626
627        let bytes = enc.finish();
628        let mut dec = Decoder::new(&bytes).unwrap();
629        let events: Vec<_> = dec
630            .decode_all()
631            .into_iter()
632            .filter_map(|f| match f {
633                DecodedFrame::Event {
634                    timestamp_ns,
635                    values,
636                    ..
637                } => Some((timestamp_ns, values)),
638                _ => None,
639            })
640            .collect();
641
642        assert_eq!(events.len(), 4);
643        assert_eq!(events[0].0, Some(ts1));
644        assert_eq!(events[0].1, vec![FieldValue::Varint(1)]);
645        assert_eq!(events[1].0, Some(ts2));
646        assert_eq!(events[1].1, vec![FieldValue::Varint(2)]);
647        assert_eq!(events[2].0, Some(ts3));
648        assert_eq!(events[2].1, vec![FieldValue::Varint(3)]);
649        assert_eq!(events[3].0, Some(ts4));
650        assert_eq!(events[3].1, vec![FieldValue::Varint(4)]);
651    }
652
653    #[test]
654    fn encoder_new_to_writer() {
655        let mut buf = Vec::new();
656        let enc = Encoder::new_to(&mut buf).unwrap();
657        drop(enc);
658        assert!(buf.len() >= 5);
659        assert_eq!(&buf[..5], &[0x54, 0x52, 0x43, 0x00, 1]);
660    }
661
662    #[test]
663    fn decoder_into_encoder_appends_without_header() {
664        use crate::decoder::{DecodedFrame, Decoder};
665
666        // Create a trace with a header, a schema, and an event
667        let mut enc = Encoder::new();
668        let schema = enc
669            .register_schema(
670                "Ev",
671                vec![FieldDef {
672                    name: "v".into(),
673                    field_type: FieldType::Varint,
674                }],
675            )
676            .unwrap();
677        enc.write_event(&schema, &[FieldValue::Varint(1_000), FieldValue::Varint(1)])
678            .unwrap();
679        let base = enc.finish();
680
681        // Decode all frames, then convert into an encoder that appends to output
682        let mut decoder = Decoder::new(&base).unwrap();
683        while decoder.next_frame_ref().ok().flatten().is_some() {}
684        let mut output = Vec::new();
685        let mut ext = decoder.into_encoder(&mut output);
686        // Schema "Ev" is already known — no duplicate schema frame emitted
687        ext.write_event(&schema, &[FieldValue::Varint(2_000), FieldValue::Varint(2)])
688            .unwrap();
689        drop(ext);
690
691        // Concatenate and decode
692        let mut combined = base.clone();
693        combined.extend_from_slice(&output);
694        let mut dec = Decoder::new(&combined).unwrap();
695        let events: Vec<_> = dec
696            .decode_all()
697            .into_iter()
698            .filter_map(|f| match f {
699                DecodedFrame::Event {
700                    timestamp_ns,
701                    values,
702                    ..
703                } => Some((timestamp_ns, values)),
704                _ => None,
705            })
706            .collect();
707        assert_eq!(events.len(), 2);
708        assert_eq!(events[0].0, Some(1_000));
709        assert_eq!(events[1].0, Some(2_000));
710    }
711
712    #[test]
713    fn decoder_into_encoder_deduplicates_interned_strings() {
714        use crate::decoder::{DecodedFrame, Decoder};
715
716        // Create a trace with an interned string
717        let mut enc = Encoder::new();
718        let id1 = enc.intern_string("hello").unwrap();
719        let base = enc.finish();
720
721        // Decode all frames, then convert into an encoder
722        let mut decoder = Decoder::new(&base).unwrap();
723        while decoder.next_frame_ref().ok().flatten().is_some() {}
724        let mut output = Vec::new();
725        let mut ext = decoder.into_encoder(&mut output);
726        // "hello" is already interned, should reuse the same ID
727        let id2 = ext.intern_string("hello").unwrap();
728        let id3 = ext.intern_string("world").unwrap();
729        drop(ext);
730
731        assert_eq!(id1, id2, "existing string should reuse pool ID");
732        assert_ne!(id2, id3);
733
734        // "hello" should not produce a new StringPool frame; "world" should
735        let mut combined = base.clone();
736        combined.extend_from_slice(&output);
737        let mut dec = Decoder::new(&combined).unwrap();
738        let frames = dec.decode_all();
739        let pool_frames: Vec<_> = frames
740            .iter()
741            .filter(|f| matches!(f, DecodedFrame::StringPool(_)))
742            .collect();
743        // One from the base trace ("hello"), one from extend ("world")
744        assert_eq!(pool_frames.len(), 2);
745    }
746
747    #[test]
748    fn register_and_write() {
749        use crate::decoder::{DecodedFrame, Decoder};
750
751        let mut enc = Encoder::new();
752        let schema = enc
753            .register_schema(
754                "MyEvent",
755                vec![
756                    FieldDef {
757                        name: "count".into(),
758                        field_type: FieldType::Varint,
759                    },
760                    FieldDef {
761                        name: "name".into(),
762                        field_type: FieldType::String,
763                    },
764                ],
765            )
766            .unwrap();
767
768        enc.write_event(
769            &schema,
770            &[
771                FieldValue::Varint(1_000_000),
772                FieldValue::Varint(42),
773                FieldValue::String("hello".into()),
774            ],
775        )
776        .unwrap();
777
778        let bytes = enc.finish();
779        let mut dec = Decoder::new(&bytes).unwrap();
780        let frames = dec.decode_all();
781        let events: Vec<_> = frames
782            .into_iter()
783            .filter_map(|f| match f {
784                DecodedFrame::Event {
785                    timestamp_ns,
786                    values,
787                    ..
788                } => Some((timestamp_ns, values)),
789                _ => None,
790            })
791            .collect();
792        assert_eq!(events.len(), 1);
793        assert_eq!(events[0].0, Some(1_000_000));
794        assert_eq!(events[0].1[0], FieldValue::Varint(42));
795        assert_eq!(events[0].1[1], FieldValue::String("hello".into()));
796    }
797
798    #[test]
799    fn register_conflict_errors() {
800        let mut enc = Encoder::new();
801        enc.register_schema(
802            "Ev",
803            vec![FieldDef {
804                name: "v".into(),
805                field_type: FieldType::Varint,
806            }],
807        )
808        .unwrap();
809        let result = enc.register_schema(
810            "Ev",
811            vec![FieldDef {
812                name: "other".into(),
813                field_type: FieldType::Bool,
814            }],
815        );
816        assert!(result.is_err());
817    }
818
819    #[test]
820    fn write_wrong_field_count_errors() {
821        let mut enc = Encoder::new();
822        let schema = enc
823            .register_schema(
824                "Ev",
825                vec![FieldDef {
826                    name: "v".into(),
827                    field_type: FieldType::Varint,
828                }],
829            )
830            .unwrap();
831        // Pass 3 values (ts + 2 fields) for a 1-field schema
832        let result = enc.write_event(
833            &schema,
834            &[
835                FieldValue::Varint(0),
836                FieldValue::Varint(1),
837                FieldValue::Varint(2),
838            ],
839        );
840        assert!(result.is_err());
841    }
842
843    /// Verify that the encoder advances the timestamp base after each event,
844    /// producing inter-event deltas rather than base-relative deltas.
845    #[test]
846    fn timestamp_base_advances_per_event() {
847        use crate::decoder::{DecodedFrame, Decoder};
848
849        let mut enc = Encoder::new();
850        let schema = enc
851            .register_schema(
852                "Ev",
853                vec![FieldDef {
854                    name: "v".into(),
855                    field_type: FieldType::Varint,
856                }],
857            )
858            .unwrap();
859
860        let ts1 = 12_000_000u64;
861        let ts2 = 24_000_000u64;
862        enc.write_event(&schema, &[FieldValue::Varint(ts1), FieldValue::Varint(1)])
863            .unwrap();
864        enc.write_event(&schema, &[FieldValue::Varint(ts2), FieldValue::Varint(2)])
865            .unwrap();
866
867        let bytes = enc.finish();
868
869        let reset_count = bytes.iter().filter(|&&b| b == 0x05).count();
870        assert_eq!(
871            reset_count, 0,
872            "base should advance per event, avoiding unnecessary resets"
873        );
874
875        let mut dec = Decoder::new(&bytes).unwrap();
876        let events: Vec<_> = dec
877            .decode_all()
878            .into_iter()
879            .filter_map(|f| match f {
880                DecodedFrame::Event { timestamp_ns, .. } => timestamp_ns,
881                _ => None,
882            })
883            .collect();
884        assert_eq!(events, vec![ts1, ts2]);
885    }
886
887    #[test]
888    fn reset_to_preserves_capacity() {
889        let mut enc = Encoder::new();
890        for i in 0..100 {
891            enc.intern_string(&format!("string_{}", i)).unwrap();
892        }
893        let cap_before = enc.string_pool.capacity();
894        let _bytes = enc.reset_to(Vec::new());
895        let cap_after = enc.string_pool.capacity();
896        assert_eq!(
897            cap_before, cap_after,
898            "string_pool capacity should be preserved after reset_to"
899        );
900    }
901
902    #[test]
903    fn reset_to_returns_old_data_and_clears_state() {
904        use crate::decoder::{DecodedFrame, Decoder};
905
906        let mut enc = Encoder::new();
907        let schema = enc
908            .register_schema(
909                "Ev",
910                vec![FieldDef {
911                    name: "v".into(),
912                    field_type: FieldType::Varint,
913                }],
914            )
915            .unwrap();
916        enc.write_event(
917            &schema,
918            &[FieldValue::Varint(1_000), FieldValue::Varint(42)],
919        )
920        .unwrap();
921        let _s = enc.intern_string("hello").unwrap();
922
923        let old_bytes_written = enc.bytes_written();
924        assert!(old_bytes_written > 0);
925
926        // --- reset ---
927        let old = enc.reset_to_infallible(Vec::new());
928
929        // Invariant 1: old writer contains the data we wrote (decodable)
930        let mut dec = Decoder::new(&old).unwrap();
931        let frames = dec.decode_all();
932        assert!(frames.iter().any(|f| matches!(f, DecodedFrame::Schema(_))));
933        assert!(
934            frames
935                .iter()
936                .any(|f| matches!(f, DecodedFrame::Event { .. }))
937        );
938        assert!(
939            frames
940                .iter()
941                .any(|f| matches!(f, DecodedFrame::StringPool(_)))
942        );
943
944        // Invariant 2: bytes_written resets to just the header size
945        assert!(
946            enc.bytes_written() < old_bytes_written,
947            "bytes_written should reset (got {} vs old {})",
948            enc.bytes_written(),
949            old_bytes_written
950        );
951
952        // Invariant 3: schemas are cleared — same schema must re-register
953        // (write_event auto-registers, so we verify a new schema frame appears)
954        enc.write_event(
955            &schema,
956            &[FieldValue::Varint(2_000), FieldValue::Varint(99)],
957        )
958        .unwrap();
959
960        // Invariant 4: string pool is cleared — re-interning emits a new pool frame
961        let _s2 = enc.intern_string("hello").unwrap();
962
963        // Invariant 5: new output is a valid standalone trace
964        let new_bytes = enc.reset_to_infallible(Vec::new());
965        let mut dec2 = Decoder::new(&new_bytes).unwrap();
966        let new_frames = dec2.decode_all();
967        // Must have its own schema definition (not relying on old encoder state)
968        assert!(
969            new_frames
970                .iter()
971                .any(|f| matches!(f, DecodedFrame::Schema(s) if s.name == "Ev")),
972            "new trace must contain schema definition"
973        );
974        // Must have its own string pool entry
975        assert!(
976            new_frames
977                .iter()
978                .any(|f| matches!(f, DecodedFrame::StringPool(_))),
979            "new trace must contain string pool"
980        );
981        // Event must decode with correct timestamp (timestamp_base was reset)
982        let event = new_frames
983            .iter()
984            .find_map(|f| match f {
985                DecodedFrame::Event {
986                    timestamp_ns,
987                    values,
988                    ..
989                } => Some((timestamp_ns, values)),
990                _ => None,
991            })
992            .expect("new trace must contain event");
993        assert_eq!(*event.0, Some(2_000));
994        assert_eq!(event.1[0], FieldValue::Varint(99));
995    }
996
997    #[test]
998    fn into_raw_encoder_preserves_byte_count() {
999        let mut enc = Encoder::new();
1000        let schema = enc
1001            .register_schema(
1002                "Ev",
1003                vec![FieldDef {
1004                    name: "v".into(),
1005                    field_type: FieldType::Varint,
1006                }],
1007            )
1008            .unwrap();
1009        enc.write_event(
1010            &schema,
1011            &[FieldValue::Varint(1_000), FieldValue::Varint(42)],
1012        )
1013        .unwrap();
1014
1015        let bytes_before = enc.bytes_written();
1016        assert!(bytes_before > 0);
1017
1018        let raw = enc.into_raw_encoder();
1019        assert_eq!(
1020            raw.bytes_written(),
1021            bytes_before,
1022            "byte count must be preserved across conversion"
1023        );
1024    }
1025
1026    #[test]
1027    fn raw_encoder_write_raw_and_bytes_written() {
1028        let enc = Encoder::new();
1029        let initial = enc.bytes_written();
1030        let mut raw = enc.into_raw_encoder();
1031
1032        let payload = [0xAA; 100];
1033        raw.write_raw(&payload).unwrap();
1034
1035        assert_eq!(
1036            raw.bytes_written(),
1037            initial + payload.len() as u64,
1038            "bytes_written must include raw payload"
1039        );
1040    }
1041
1042    #[test]
1043    fn raw_encoder_into_inner_returns_all_data() {
1044        use crate::decoder::{DecodedFrame, Decoder};
1045
1046        // Write a structured event via Encoder, then append a raw batch
1047        // via RawEncoder, and verify the combined output decodes correctly.
1048        let mut enc = Encoder::new();
1049        let schema = enc
1050            .register_schema(
1051                "Ev",
1052                vec![FieldDef {
1053                    name: "v".into(),
1054                    field_type: FieldType::Varint,
1055                }],
1056            )
1057            .unwrap();
1058        enc.write_event(&schema, &[FieldValue::Varint(1_000), FieldValue::Varint(1)])
1059            .unwrap();
1060
1061        // Build a raw batch with the same schema
1062        let raw_batch = {
1063            let mut batch_enc = Encoder::new();
1064            batch_enc
1065                .write_event(&schema, &[FieldValue::Varint(2_000), FieldValue::Varint(2)])
1066                .unwrap();
1067            batch_enc.finish()
1068        };
1069
1070        let mut raw = enc.into_raw_encoder();
1071        raw.write_raw(&raw_batch).unwrap();
1072        let combined = raw.into_inner();
1073
1074        let mut dec = Decoder::new(&combined).unwrap();
1075        let events: Vec<_> = dec
1076            .decode_all()
1077            .into_iter()
1078            .filter_map(|f| match f {
1079                DecodedFrame::Event {
1080                    timestamp_ns,
1081                    values,
1082                    ..
1083                } => Some((timestamp_ns, values)),
1084                _ => None,
1085            })
1086            .collect();
1087
1088        assert_eq!(events.len(), 2);
1089        assert_eq!(events[0].0, Some(1_000));
1090        assert_eq!(events[0].1, vec![FieldValue::Varint(1)]);
1091        assert_eq!(events[1].0, Some(2_000));
1092        assert_eq!(events[1].1, vec![FieldValue::Varint(2)]);
1093    }
1094}