1use crate::TraceEvent;
8use crate::codec::{self, PoolEntry, StackPoolEntry, WireTypeId};
9use crate::schema::{SchemaEntry, SchemaRegistry};
10use crate::types::{
11 CountingWriter, EncodeState, EventEncoder, InternedStackFrames, InternedString,
12};
13use std::any::TypeId;
14use std::collections::HashMap;
15use std::hash::{BuildHasherDefault, Hasher};
16use std::io::{self, Write};
17use std::sync::Arc;
18
19#[doc(hidden)]
24#[derive(Default)]
25pub struct FxHasher(u64);
26
27impl FxHasher {
28 #[inline]
29 fn hash_word(&mut self, word: u64) {
30 self.0 = (self.0.rotate_left(5) ^ word).wrapping_mul(0x517cc1b727220a95);
31 }
32}
33
34impl Hasher for FxHasher {
35 #[inline]
36 fn write(&mut self, mut bytes: &[u8]) {
37 while bytes.len() >= 8 {
38 self.hash_word(u64::from_ne_bytes(bytes[..8].try_into().unwrap()));
39 bytes = &bytes[8..];
40 }
41 if bytes.len() >= 4 {
42 self.hash_word(u32::from_ne_bytes(bytes[..4].try_into().unwrap()) as u64);
43 bytes = &bytes[4..];
44 }
45 for &b in bytes {
46 self.hash_word(b as u64);
47 }
48 }
49
50 #[inline]
51 fn write_u8(&mut self, i: u8) {
52 self.hash_word(i as u64);
53 }
54
55 #[inline]
56 fn write_u16(&mut self, i: u16) {
57 self.hash_word(i as u64);
58 }
59
60 #[inline]
61 fn write_u32(&mut self, i: u32) {
62 self.hash_word(i as u64);
63 }
64
65 #[inline]
66 fn write_u64(&mut self, i: u64) {
67 self.hash_word(i);
68 }
69
70 #[inline]
71 fn write_usize(&mut self, i: usize) {
72 self.hash_word(i as u64);
73 }
74
75 #[inline]
76 fn write_u128(&mut self, i: u128) {
77 self.hash_word(i as u64);
78 self.hash_word((i >> 64) as u64);
79 }
80
81 #[inline]
82 fn finish(&self) -> u64 {
83 self.0
84 }
85}
86
87#[doc(hidden)]
88pub type FxBuildHasher = BuildHasherDefault<FxHasher>;
89#[doc(hidden)]
90pub type FxHashMap<K, V> = HashMap<K, V, FxBuildHasher>;
91
92#[derive(Clone, Debug)]
101pub struct Schema {
102 pub(crate) entry: Arc<SchemaEntry>,
103 name_key: Arc<str>,
106}
107
108impl Schema {
109 pub fn new(name: &str, fields: Vec<crate::schema::FieldDef>) -> Self {
114 let name_key: Arc<str> = Arc::from(name);
115 Self {
116 entry: Arc::new(SchemaEntry {
117 name: name.to_string(),
118 has_timestamp: true,
119 fields,
120 }),
121 name_key,
122 }
123 }
124
125 pub fn name(&self) -> &str {
127 &self.entry.name
128 }
129
130 pub fn fields(&self) -> &[crate::schema::FieldDef] {
132 &self.entry.fields
133 }
134}
135
136#[derive(Clone, PartialEq, Eq, Hash)]
139enum SchemaKey {
140 Name(Arc<str>),
141 RustType(TypeId),
142}
143
144pub struct Encoder<W: Write = Vec<u8>> {
152 state: EncodeState<W>,
153 registry: SchemaRegistry,
154 string_pool: FxHashMap<String, u32>,
155 next_pool_id: u32,
156 stack_pool: FxHashMap<Box<[u64]>, u32>,
157 next_stack_pool_id: u32,
158 schema_ids: FxHashMap<SchemaKey, WireTypeId>,
159}
160
161impl Default for Encoder<Vec<u8>> {
162 fn default() -> Self {
163 Self::new()
164 }
165}
166
167impl Encoder<Vec<u8>> {
168 pub fn new() -> Self {
169 let mut buf = Vec::new();
170 codec::encode_header(&mut buf).expect("Vec::write_all cannot fail");
171 Self {
172 state: EncodeState::new(buf),
173 registry: SchemaRegistry::new(),
174 string_pool: FxHashMap::default(),
175 next_pool_id: 0,
176 stack_pool: FxHashMap::default(),
177 next_stack_pool_id: 0,
178 schema_ids: FxHashMap::default(),
179 }
180 }
181
182 pub fn finish(self) -> Vec<u8> {
184 self.state.writer.into_inner()
185 }
186}
187
188impl<W: Write> Encoder<W> {
189 pub fn new_to(mut writer: W) -> io::Result<Self> {
192 codec::encode_header(&mut writer)?;
193 Ok(Self {
194 state: EncodeState::new(writer),
195 registry: SchemaRegistry::new(),
196 string_pool: FxHashMap::default(),
197 next_pool_id: 0,
198 stack_pool: FxHashMap::default(),
199 next_stack_pool_id: 0,
200 schema_ids: FxHashMap::default(),
201 })
202 }
203
204 pub(crate) fn from_decoder(
207 mut registry: SchemaRegistry,
208 string_pool: crate::decoder::StringPool,
209 stack_pool: crate::decoder::StackPool,
210 timestamp_base_ns: u64,
211 writer: W,
212 ) -> Self {
213 let mut pool = FxHashMap::default();
214 let mut next_pool_id: u32 = 0;
215 for (id, value) in string_pool.0.into_iter() {
216 pool.insert(value, id.raw_id());
217 if id.raw_id() >= next_pool_id {
218 next_pool_id = id.raw_id() + 1;
219 }
220 }
221
222 let mut new_stack_pool: FxHashMap<Box<[u64]>, u32> = FxHashMap::default();
223 let mut next_stack_pool_id: u32 = 0;
224 for (id, frames) in stack_pool.0.into_iter() {
225 new_stack_pool.insert(frames.into_boxed_slice(), id.raw_id());
226 if id.raw_id() >= next_stack_pool_id {
227 next_stack_pool_id = id.raw_id() + 1;
228 }
229 }
230
231 let mut schema_ids = FxHashMap::default();
232 for (wire_id, entry) in registry.entries() {
233 schema_ids.insert(SchemaKey::Name(Arc::from(entry.name.as_str())), wire_id);
234 }
235 registry.sync_next_id();
236
237 let mut state = EncodeState::new(writer);
238 state.set_ts_base_unchecked(timestamp_base_ns);
239
240 Self {
241 state,
242 registry,
243 string_pool: pool,
244 next_pool_id,
245 stack_pool: new_stack_pool,
246 next_stack_pool_id,
247 schema_ids,
248 }
249 }
250
251 pub fn into_inner(self) -> W {
253 self.state.writer.into_inner()
254 }
255
256 pub fn as_inner(&self) -> &W {
258 self.state.writer.inner()
259 }
260
261 pub fn bytes_written(&self) -> u64 {
263 self.state.writer.bytes_written()
264 }
265
266 pub fn reset_to(&mut self, mut new_writer: W) -> io::Result<W> {
269 codec::encode_header(&mut new_writer)?;
270 self.string_pool.clear();
271 self.next_pool_id = 0;
272 self.stack_pool.clear();
273 self.next_stack_pool_id = 0;
274 self.registry.clear();
275 self.schema_ids.clear();
276 let old_state = std::mem::replace(&mut self.state, EncodeState::new(new_writer));
278 Ok(old_state.writer.into_inner())
279 }
280
281 fn ensure_registered(&mut self, schema: &Schema) -> io::Result<WireTypeId> {
287 let key = SchemaKey::Name(Arc::clone(&schema.name_key));
288 if let Some(&wire_id) = self.schema_ids.get(&key) {
289 let Some(existing) = self.registry.get(wire_id) else {
291 return Err(io::Error::other(format!(
292 "corrupted internal state. {wire_id:?} in schema_ids but not in registry."
293 )));
294 };
295 if *existing == *schema.entry {
296 return Ok(wire_id);
297 }
298 return Err(io::Error::new(
299 io::ErrorKind::InvalidInput,
300 format!(
301 "schema already registered with different definition: {}",
302 schema.name()
303 ),
304 ));
305 }
306 let id = self.registry.next_type_id();
307 codec::encode_schema(id, &schema.entry, &mut self.state.writer)?;
308 self.registry
309 .register(id, (*schema.entry).clone())
310 .expect("schema registration failed");
311 self.schema_ids.insert(key, id);
312 Ok(id)
313 }
314
315 pub fn register_schema(
325 &mut self,
326 name: &str,
327 fields: Vec<crate::schema::FieldDef>,
328 ) -> io::Result<Schema> {
329 let schema = Schema::new(name, fields);
330 self.ensure_registered(&schema)?;
331 Ok(schema)
332 }
333
334 pub fn write_event(
343 &mut self,
344 schema: &Schema,
345 values: &[crate::types::FieldValue],
346 ) -> io::Result<()> {
347 use crate::types::FieldValue;
348
349 let type_id = self.ensure_registered(schema)?;
350 let expected_fields = schema.entry.fields.len();
351
352 let ts_ns = match values.first() {
353 Some(FieldValue::Varint(ns)) => *ns,
354 _ => {
355 return Err(io::Error::new(
356 io::ErrorKind::InvalidInput,
357 "first value must be FieldValue::Varint(timestamp_ns)",
358 ));
359 }
360 };
361 let field_values = &values[1..];
362
363 if field_values.len() != expected_fields {
364 return Err(io::Error::new(
365 io::ErrorKind::InvalidInput,
366 format!(
367 "value count ({}) does not match schema field count ({}) for schema '{}'",
368 field_values.len(),
369 expected_fields,
370 schema.name(),
371 ),
372 ));
373 }
374
375 let ts_delta = self.state.encode_timestamp_delta(ts_ns)?;
376 self.state.writer.write_all(&[codec::TAG_EVENT])?;
377 self.state.writer.write_all(&type_id.0.to_le_bytes())?;
378 codec::encode_u24_le(ts_delta, &mut self.state.writer)?;
379 let mut enc = EventEncoder::new(&mut self.state);
380 for (i, v) in field_values.iter().enumerate() {
381 enc.write_field_value(v, schema.entry.fields[i].field_type)?;
382 }
383 Ok(())
384 }
385
386 pub fn write<T: TraceEvent + 'static>(&mut self, event: &T) -> io::Result<()> {
389 let key = SchemaKey::RustType(TypeId::of::<T>());
390 let tid = if let Some(&cached) = self.schema_ids.get(&key) {
391 cached
392 } else {
393 let entry = T::schema_entry();
394 let schema = Schema::new(&entry.name, entry.fields);
395 let id = self.ensure_registered(&schema)?;
396 self.schema_ids.insert(key, id);
397 id
398 };
399 let ts_ns = event.timestamp();
400 let ts_delta = self.state.encode_timestamp_delta(ts_ns)?;
401 self.state.writer.write_all(&[codec::TAG_EVENT])?;
402 self.state.writer.write_all(&tid.0.to_le_bytes())?;
403 codec::encode_u24_le(ts_delta, &mut self.state.writer)?;
404 let mut enc = EventEncoder::new(&mut self.state);
405 event.encode_fields(&mut enc)
406 }
407
408 pub fn intern_string(&mut self, s: &str) -> io::Result<InternedString> {
410 if let Some(&id) = self.string_pool.get(s) {
411 return Ok(InternedString(id));
412 }
413 let id = self.next_pool_id;
414 self.next_pool_id += 1;
415 self.string_pool.insert(s.to_string(), id);
416 codec::encode_string_pool(
417 &[PoolEntry {
418 pool_id: id,
419 data: s.as_bytes().to_vec(),
420 }],
421 &mut self.state.writer,
422 )?;
423 Ok(InternedString(id))
424 }
425
426 pub fn write_string_pool(&mut self, entries: &[PoolEntry]) -> io::Result<()> {
427 codec::encode_string_pool(entries, &mut self.state.writer)
428 }
429
430 pub fn intern_stack_frames(&mut self, frames: &[u64]) -> io::Result<InternedStackFrames> {
433 if let Some(&id) = self.stack_pool.get(frames) {
434 return Ok(InternedStackFrames(id));
435 }
436 let id = self.next_stack_pool_id;
437 self.next_stack_pool_id += 1;
438 self.stack_pool.insert(frames.into(), id);
439 codec::encode_stack_pool(
440 &[StackPoolEntry {
441 pool_id: id,
442 frames: frames.to_vec(),
443 }],
444 &mut self.state.writer,
445 )?;
446 Ok(InternedStackFrames(id))
447 }
448
449 pub fn write_stack_pool(&mut self, entries: &[StackPoolEntry]) -> io::Result<()> {
450 codec::encode_stack_pool(entries, &mut self.state.writer)
451 }
452
453 pub fn flush(&mut self) -> io::Result<()> {
455 self.state.writer.flush()
456 }
457
458 pub fn into_raw_encoder(self) -> RawEncoder<W> {
465 RawEncoder {
466 writer: self.state.writer,
467 }
468 }
469}
470
471pub struct RawEncoder<W> {
478 writer: CountingWriter<W>,
479}
480
481impl<W: Write> RawEncoder<W> {
482 pub fn write_raw(&mut self, bytes: &[u8]) -> io::Result<()> {
484 self.writer.write_all(bytes)
485 }
486
487 pub fn bytes_written(&self) -> u64 {
490 self.writer.bytes_written()
491 }
492
493 pub fn flush(&mut self) -> io::Result<()> {
495 self.writer.flush()
496 }
497
498 pub fn into_inner(self) -> W {
500 self.writer.into_inner()
501 }
502}
503
504impl Encoder<Vec<u8>> {
505 pub fn write_infallible<T: TraceEvent + 'static>(&mut self, event: &T) {
506 self.write(event).expect("writing to Vec<u8> is infallible")
507 }
508
509 pub fn intern_string_infallible(&mut self, s: &str) -> InternedString {
510 self.intern_string(s)
511 .expect("interning into Vec<u8> is infallible")
512 }
513
514 pub fn intern_stack_frames_infallible(&mut self, frames: &[u64]) -> InternedStackFrames {
515 self.intern_stack_frames(frames)
516 .expect("interning into Vec<u8> is infallible")
517 }
518
519 pub fn reset_to_infallible(&mut self, data: Vec<u8>) -> Vec<u8> {
521 self.reset_to(data)
522 .expect("writing to Vec<u8> is infallible")
523 }
524}
525
526#[cfg(test)]
527mod tests {
528 use super::*;
529 use crate::schema::FieldDef;
530 use crate::types::{FieldType, FieldValue};
531
532 #[test]
533 fn encoder_writes_header() {
534 let enc = Encoder::new();
535 let data = enc.finish();
536 assert_eq!(&data[..5], &[0x54, 0x52, 0x43, 0x00, 1]);
537 }
538
539 #[test]
540 fn encoder_register_and_write_event() {
541 let mut enc = Encoder::new();
542 let schema = enc
543 .register_schema(
544 "Ev",
545 vec![FieldDef {
546 name: "v".into(),
547 field_type: FieldType::Varint,
548 }],
549 )
550 .unwrap();
551 enc.write_event(
552 &schema,
553 &[FieldValue::Varint(1_000), FieldValue::Varint(42)],
554 )
555 .unwrap();
556 let data = enc.finish();
557 assert!(data.len() > 5);
558 }
559
560 #[test]
561 fn idempotent_re_registration() {
562 let mut enc = Encoder::new();
563 let fields = vec![FieldDef {
564 name: "v".into(),
565 field_type: FieldType::Varint,
566 }];
567 let _s1 = enc.register_schema("Ev", fields.clone()).unwrap();
568 let _s2 = enc.register_schema("Ev", fields).unwrap();
569 }
571
572 #[test]
573 fn re_registration_different_schema_errors() {
574 let mut enc = Encoder::new();
575 enc.register_schema(
576 "Ev",
577 vec![FieldDef {
578 name: "v".into(),
579 field_type: FieldType::Varint,
580 }],
581 )
582 .unwrap();
583 let result = enc.register_schema(
584 "Ev",
585 vec![FieldDef {
586 name: "different".into(),
587 field_type: FieldType::Bool,
588 }],
589 );
590 assert!(result.is_err());
591 }
592
593 #[test]
594 fn schema_auto_registers_on_write() {
595 use crate::decoder::{DecodedFrame, Decoder};
596
597 let schema = Schema::new(
599 "Lazy",
600 vec![FieldDef {
601 name: "v".into(),
602 field_type: FieldType::Varint,
603 }],
604 );
605
606 let mut enc = Encoder::new();
608 enc.write_event(
609 &schema,
610 &[FieldValue::Varint(1_000), FieldValue::Varint(42)],
611 )
612 .unwrap();
613
614 let bytes = enc.finish();
615 let mut dec = Decoder::new(&bytes).unwrap();
616 let frames = dec.decode_all();
617 assert!(matches!(&frames[0], DecodedFrame::Schema(s) if s.name == "Lazy"));
618 if let DecodedFrame::Event { values, .. } = &frames[1] {
619 assert_eq!(*values, vec![FieldValue::Varint(42)]);
620 } else {
621 panic!("expected event");
622 }
623 }
624
625 #[test]
626 fn schema_portable_across_encoders() {
627 use crate::decoder::{DecodedFrame, Decoder};
628
629 let mut enc1 = Encoder::new();
630 let schema = enc1
631 .register_schema(
632 "Shared",
633 vec![FieldDef {
634 name: "v".into(),
635 field_type: FieldType::Varint,
636 }],
637 )
638 .unwrap();
639 enc1.write_event(&schema, &[FieldValue::Varint(1_000), FieldValue::Varint(1)])
640 .unwrap();
641
642 let mut enc2 = Encoder::new();
644 enc2.write_event(&schema, &[FieldValue::Varint(2_000), FieldValue::Varint(2)])
645 .unwrap();
646
647 for (enc, expected_val) in [(enc1, 1u64), (enc2, 2u64)] {
649 let bytes = enc.finish();
650 let mut dec = Decoder::new(&bytes).unwrap();
651 let frames = dec.decode_all();
652 let event = frames
653 .iter()
654 .find(|f| matches!(f, DecodedFrame::Event { .. }))
655 .unwrap();
656 if let DecodedFrame::Event { values, .. } = event {
657 assert_eq!(values[0], FieldValue::Varint(expected_val));
658 }
659 }
660 }
661
662 #[test]
663 fn encoder_intern_string_deduplicates() {
664 let mut enc = Encoder::new();
665 let id1 = enc.intern_string("hello").unwrap();
666 let id2 = enc.intern_string("hello").unwrap();
667 let id3 = enc.intern_string("world").unwrap();
668 assert_eq!(id1, id2);
669 assert_ne!(id1, id3);
670 }
671
672 #[test]
673 fn encoder_intern_stack_frames_deduplicates() {
674 let mut enc = Encoder::new();
675 let stack_a: &[u64] = &[0x1000, 0x2000, 0x3000];
676 let stack_b: &[u64] = &[0x4000, 0x5000];
677 let id1 = enc.intern_stack_frames(stack_a).unwrap();
678 let id2 = enc.intern_stack_frames(stack_a).unwrap();
679 let id3 = enc.intern_stack_frames(stack_b).unwrap();
680 assert_eq!(id1, id2);
681 assert_ne!(id1, id3);
682 }
683
684 #[test]
685 fn stack_pool_round_trip_via_decoder() {
686 use crate::decoder::Decoder;
687 use crate::types::InternedStackFrames;
688
689 let mut enc = Encoder::new();
690 let stack_a: &[u64] = &[0xdead, 0xbeef, 0xcafe];
691 let stack_b: &[u64] = &[0x1, 0x2];
692 let id_a = enc.intern_stack_frames(stack_a).unwrap();
693 let id_b = enc.intern_stack_frames(stack_b).unwrap();
694 let bytes = enc.finish();
695
696 let mut dec = Decoder::new(&bytes).unwrap();
697 let _ = dec.decode_all();
698 assert_eq!(
699 dec.stack_pool().get(InternedStackFrames(id_a.raw_id())),
700 Some(stack_a)
701 );
702 assert_eq!(
703 dec.stack_pool().get(InternedStackFrames(id_b.raw_id())),
704 Some(stack_b)
705 );
706 }
707
708 #[test]
709 fn for_each_event_populates_stack_pool() {
710 use crate::decoder::Decoder;
711 use crate::schema::FieldDef;
712 use crate::types::{FieldType, FieldValue, InternedStackFrames};
713
714 let mut enc = Encoder::new();
715 let schema = enc
716 .register_schema(
717 "CpuSampleEvent",
718 vec![FieldDef {
719 name: "callchain".into(),
720 field_type: FieldType::PooledStackFrames,
721 }],
722 )
723 .unwrap();
724 let stack: &[u64] = &[0x1234, 0x5678, 0x9abc];
725 let id = enc.intern_stack_frames(stack).unwrap();
726 enc.write_event(
727 &schema,
728 &[
729 FieldValue::Varint(1_000_000),
730 FieldValue::PooledStackFrames(id),
731 ],
732 )
733 .unwrap();
734 let bytes = enc.finish();
735
736 let mut dec = Decoder::new(&bytes).unwrap();
737 let mut event_count = 0;
738 dec.for_each_event(|_ev| {
739 event_count += 1;
740 })
741 .unwrap();
742 assert_eq!(event_count, 1);
743 assert_eq!(
744 dec.stack_pool().get(InternedStackFrames(id.raw_id())),
745 Some(stack),
746 );
747 }
748
749 #[test]
750 fn encoder_intern_empty_stack_frames() {
751 use crate::decoder::Decoder;
752 use crate::types::InternedStackFrames;
753
754 let mut enc = Encoder::new();
755 let id1 = enc.intern_stack_frames(&[]).unwrap();
756 let id2 = enc.intern_stack_frames(&[]).unwrap();
757 assert_eq!(id1, id2);
758 let bytes = enc.finish();
759
760 let mut dec = Decoder::new(&bytes).unwrap();
761 let _ = dec.decode_all();
762 assert_eq!(
763 dec.stack_pool().get(InternedStackFrames(id1.raw_id())),
764 Some(&[][..])
765 );
766 }
767
768 #[test]
769 fn write_stack_pool_multi_entry_round_trip() {
770 use crate::decoder::Decoder;
771 use crate::types::InternedStackFrames;
772
773 let mut enc = Encoder::new();
774 let entries = vec![
775 StackPoolEntry {
776 pool_id: 0,
777 frames: vec![0xaaaa, 0xbbbb, 0xcccc],
778 },
779 StackPoolEntry {
780 pool_id: 1,
781 frames: vec![0x1111],
782 },
783 StackPoolEntry {
784 pool_id: 2,
785 frames: vec![],
786 },
787 ];
788 enc.write_stack_pool(&entries).unwrap();
789 let bytes = enc.finish();
790
791 let mut dec = Decoder::new(&bytes).unwrap();
792 let _ = dec.decode_all();
793 assert_eq!(
794 dec.stack_pool().get(InternedStackFrames(0)),
795 Some(&[0xaaaa, 0xbbbb, 0xcccc][..])
796 );
797 assert_eq!(
798 dec.stack_pool().get(InternedStackFrames(1)),
799 Some(&[0x1111][..])
800 );
801 assert_eq!(dec.stack_pool().get(InternedStackFrames(2)), Some(&[][..]));
802 }
803
804 #[test]
805 fn decoder_into_encoder_deduplicates_interned_stack_frames() {
806 use crate::decoder::Decoder;
807
808 let mut enc = Encoder::new();
809 let id1 = enc.intern_stack_frames(&[0x10, 0x20]).unwrap();
810 let base = enc.finish();
811
812 let mut decoder = Decoder::new(&base).unwrap();
813 while decoder.next_frame_ref().ok().flatten().is_some() {}
814 let mut output = Vec::new();
815 let mut ext = decoder.into_encoder(&mut output);
816 let id2 = ext.intern_stack_frames(&[0x10, 0x20]).unwrap();
817 let id3 = ext.intern_stack_frames(&[0x30]).unwrap();
818 assert_eq!(id1.raw_id(), id2.raw_id());
819 assert_ne!(id2.raw_id(), id3.raw_id());
820 }
821
822 #[test]
823 fn timestamp_round_trip() {
824 use crate::decoder::{DecodedFrame, Decoder};
825
826 let mut enc = Encoder::new();
827 let schema = enc
828 .register_schema(
829 "TS",
830 vec![FieldDef {
831 name: "v".into(),
832 field_type: FieldType::Varint,
833 }],
834 )
835 .unwrap();
836
837 let ts1 = 100_000u64;
838 let ts2 = 50_000u64;
839 let ts3 = 200_000_000u64;
840 let ts4 = 100_000_000u64;
841 enc.write_event(&schema, &[FieldValue::Varint(ts1), FieldValue::Varint(1)])
842 .unwrap();
843 enc.write_event(&schema, &[FieldValue::Varint(ts2), FieldValue::Varint(2)])
844 .unwrap();
845 enc.write_event(&schema, &[FieldValue::Varint(ts3), FieldValue::Varint(3)])
846 .unwrap();
847 enc.write_event(&schema, &[FieldValue::Varint(ts4), FieldValue::Varint(4)])
848 .unwrap();
849
850 let bytes = enc.finish();
851 let mut dec = Decoder::new(&bytes).unwrap();
852 let events: Vec<_> = dec
853 .decode_all()
854 .into_iter()
855 .filter_map(|f| match f {
856 DecodedFrame::Event {
857 timestamp_ns,
858 values,
859 ..
860 } => Some((timestamp_ns, values)),
861 _ => None,
862 })
863 .collect();
864
865 assert_eq!(events.len(), 4);
866 assert_eq!(events[0].0, Some(ts1));
867 assert_eq!(events[0].1, vec![FieldValue::Varint(1)]);
868 assert_eq!(events[1].0, Some(ts2));
869 assert_eq!(events[1].1, vec![FieldValue::Varint(2)]);
870 assert_eq!(events[2].0, Some(ts3));
871 assert_eq!(events[2].1, vec![FieldValue::Varint(3)]);
872 assert_eq!(events[3].0, Some(ts4));
873 assert_eq!(events[3].1, vec![FieldValue::Varint(4)]);
874 }
875
876 #[test]
877 fn encoder_new_to_writer() {
878 let mut buf = Vec::new();
879 let enc = Encoder::new_to(&mut buf).unwrap();
880 drop(enc);
881 assert!(buf.len() >= 5);
882 assert_eq!(&buf[..5], &[0x54, 0x52, 0x43, 0x00, 1]);
883 }
884
885 #[test]
886 fn decoder_into_encoder_appends_without_header() {
887 use crate::decoder::{DecodedFrame, Decoder};
888
889 let mut enc = Encoder::new();
891 let schema = enc
892 .register_schema(
893 "Ev",
894 vec![FieldDef {
895 name: "v".into(),
896 field_type: FieldType::Varint,
897 }],
898 )
899 .unwrap();
900 enc.write_event(&schema, &[FieldValue::Varint(1_000), FieldValue::Varint(1)])
901 .unwrap();
902 let base = enc.finish();
903
904 let mut decoder = Decoder::new(&base).unwrap();
906 while decoder.next_frame_ref().ok().flatten().is_some() {}
907 let mut output = Vec::new();
908 let mut ext = decoder.into_encoder(&mut output);
909 ext.write_event(&schema, &[FieldValue::Varint(2_000), FieldValue::Varint(2)])
911 .unwrap();
912 drop(ext);
913
914 let mut combined = base.clone();
916 combined.extend_from_slice(&output);
917 let mut dec = Decoder::new(&combined).unwrap();
918 let events: Vec<_> = dec
919 .decode_all()
920 .into_iter()
921 .filter_map(|f| match f {
922 DecodedFrame::Event {
923 timestamp_ns,
924 values,
925 ..
926 } => Some((timestamp_ns, values)),
927 _ => None,
928 })
929 .collect();
930 assert_eq!(events.len(), 2);
931 assert_eq!(events[0].0, Some(1_000));
932 assert_eq!(events[1].0, Some(2_000));
933 }
934
935 #[test]
936 fn decoder_into_encoder_deduplicates_interned_strings() {
937 use crate::decoder::{DecodedFrame, Decoder};
938
939 let mut enc = Encoder::new();
941 let id1 = enc.intern_string("hello").unwrap();
942 let base = enc.finish();
943
944 let mut decoder = Decoder::new(&base).unwrap();
946 while decoder.next_frame_ref().ok().flatten().is_some() {}
947 let mut output = Vec::new();
948 let mut ext = decoder.into_encoder(&mut output);
949 let id2 = ext.intern_string("hello").unwrap();
951 let id3 = ext.intern_string("world").unwrap();
952 drop(ext);
953
954 assert_eq!(id1, id2, "existing string should reuse pool ID");
955 assert_ne!(id2, id3);
956
957 let mut combined = base.clone();
959 combined.extend_from_slice(&output);
960 let mut dec = Decoder::new(&combined).unwrap();
961 let frames = dec.decode_all();
962 let pool_frames: Vec<_> = frames
963 .iter()
964 .filter(|f| matches!(f, DecodedFrame::StringPool(_)))
965 .collect();
966 assert_eq!(pool_frames.len(), 2);
968 }
969
970 #[test]
971 fn register_and_write() {
972 use crate::decoder::{DecodedFrame, Decoder};
973
974 let mut enc = Encoder::new();
975 let schema = enc
976 .register_schema(
977 "MyEvent",
978 vec![
979 FieldDef {
980 name: "count".into(),
981 field_type: FieldType::Varint,
982 },
983 FieldDef {
984 name: "name".into(),
985 field_type: FieldType::String,
986 },
987 ],
988 )
989 .unwrap();
990
991 enc.write_event(
992 &schema,
993 &[
994 FieldValue::Varint(1_000_000),
995 FieldValue::Varint(42),
996 FieldValue::String("hello".into()),
997 ],
998 )
999 .unwrap();
1000
1001 let bytes = enc.finish();
1002 let mut dec = Decoder::new(&bytes).unwrap();
1003 let frames = dec.decode_all();
1004 let events: Vec<_> = frames
1005 .into_iter()
1006 .filter_map(|f| match f {
1007 DecodedFrame::Event {
1008 timestamp_ns,
1009 values,
1010 ..
1011 } => Some((timestamp_ns, values)),
1012 _ => None,
1013 })
1014 .collect();
1015 assert_eq!(events.len(), 1);
1016 assert_eq!(events[0].0, Some(1_000_000));
1017 assert_eq!(events[0].1[0], FieldValue::Varint(42));
1018 assert_eq!(events[0].1[1], FieldValue::String("hello".into()));
1019 }
1020
1021 #[test]
1022 fn register_conflict_errors() {
1023 let mut enc = Encoder::new();
1024 enc.register_schema(
1025 "Ev",
1026 vec![FieldDef {
1027 name: "v".into(),
1028 field_type: FieldType::Varint,
1029 }],
1030 )
1031 .unwrap();
1032 let result = enc.register_schema(
1033 "Ev",
1034 vec![FieldDef {
1035 name: "other".into(),
1036 field_type: FieldType::Bool,
1037 }],
1038 );
1039 assert!(result.is_err());
1040 }
1041
1042 #[test]
1043 fn write_wrong_field_count_errors() {
1044 let mut enc = Encoder::new();
1045 let schema = enc
1046 .register_schema(
1047 "Ev",
1048 vec![FieldDef {
1049 name: "v".into(),
1050 field_type: FieldType::Varint,
1051 }],
1052 )
1053 .unwrap();
1054 let result = enc.write_event(
1056 &schema,
1057 &[
1058 FieldValue::Varint(0),
1059 FieldValue::Varint(1),
1060 FieldValue::Varint(2),
1061 ],
1062 );
1063 assert!(result.is_err());
1064 }
1065
1066 #[test]
1069 fn timestamp_base_advances_per_event() {
1070 use crate::decoder::{DecodedFrame, Decoder};
1071
1072 let mut enc = Encoder::new();
1073 let schema = enc
1074 .register_schema(
1075 "Ev",
1076 vec![FieldDef {
1077 name: "v".into(),
1078 field_type: FieldType::Varint,
1079 }],
1080 )
1081 .unwrap();
1082
1083 let ts1 = 12_000_000u64;
1084 let ts2 = 24_000_000u64;
1085 enc.write_event(&schema, &[FieldValue::Varint(ts1), FieldValue::Varint(1)])
1086 .unwrap();
1087 enc.write_event(&schema, &[FieldValue::Varint(ts2), FieldValue::Varint(2)])
1088 .unwrap();
1089
1090 let bytes = enc.finish();
1091
1092 let reset_count = bytes.iter().filter(|&&b| b == 0x05).count();
1093 assert_eq!(
1094 reset_count, 0,
1095 "base should advance per event, avoiding unnecessary resets"
1096 );
1097
1098 let mut dec = Decoder::new(&bytes).unwrap();
1099 let events: Vec<_> = dec
1100 .decode_all()
1101 .into_iter()
1102 .filter_map(|f| match f {
1103 DecodedFrame::Event { timestamp_ns, .. } => timestamp_ns,
1104 _ => None,
1105 })
1106 .collect();
1107 assert_eq!(events, vec![ts1, ts2]);
1108 }
1109
1110 #[test]
1111 fn reset_to_preserves_capacity() {
1112 let mut enc = Encoder::new();
1113 for i in 0..100 {
1114 enc.intern_string(&format!("string_{}", i)).unwrap();
1115 }
1116 let cap_before = enc.string_pool.capacity();
1117 let _bytes = enc.reset_to(Vec::new());
1118 let cap_after = enc.string_pool.capacity();
1119 assert_eq!(
1120 cap_before, cap_after,
1121 "string_pool capacity should be preserved after reset_to"
1122 );
1123 }
1124
1125 #[test]
1126 fn reset_to_returns_old_data_and_clears_state() {
1127 use crate::decoder::{DecodedFrame, Decoder};
1128
1129 let mut enc = Encoder::new();
1130 let schema = enc
1131 .register_schema(
1132 "Ev",
1133 vec![FieldDef {
1134 name: "v".into(),
1135 field_type: FieldType::Varint,
1136 }],
1137 )
1138 .unwrap();
1139 enc.write_event(
1140 &schema,
1141 &[FieldValue::Varint(1_000), FieldValue::Varint(42)],
1142 )
1143 .unwrap();
1144 let _s = enc.intern_string("hello").unwrap();
1145
1146 let old_bytes_written = enc.bytes_written();
1147 assert!(old_bytes_written > 0);
1148
1149 let old = enc.reset_to_infallible(Vec::new());
1151
1152 let mut dec = Decoder::new(&old).unwrap();
1154 let frames = dec.decode_all();
1155 assert!(frames.iter().any(|f| matches!(f, DecodedFrame::Schema(_))));
1156 assert!(
1157 frames
1158 .iter()
1159 .any(|f| matches!(f, DecodedFrame::Event { .. }))
1160 );
1161 assert!(
1162 frames
1163 .iter()
1164 .any(|f| matches!(f, DecodedFrame::StringPool(_)))
1165 );
1166
1167 assert!(
1169 enc.bytes_written() < old_bytes_written,
1170 "bytes_written should reset (got {} vs old {})",
1171 enc.bytes_written(),
1172 old_bytes_written
1173 );
1174
1175 enc.write_event(
1178 &schema,
1179 &[FieldValue::Varint(2_000), FieldValue::Varint(99)],
1180 )
1181 .unwrap();
1182
1183 let _s2 = enc.intern_string("hello").unwrap();
1185
1186 let new_bytes = enc.reset_to_infallible(Vec::new());
1188 let mut dec2 = Decoder::new(&new_bytes).unwrap();
1189 let new_frames = dec2.decode_all();
1190 assert!(
1192 new_frames
1193 .iter()
1194 .any(|f| matches!(f, DecodedFrame::Schema(s) if s.name == "Ev")),
1195 "new trace must contain schema definition"
1196 );
1197 assert!(
1199 new_frames
1200 .iter()
1201 .any(|f| matches!(f, DecodedFrame::StringPool(_))),
1202 "new trace must contain string pool"
1203 );
1204 let event = new_frames
1206 .iter()
1207 .find_map(|f| match f {
1208 DecodedFrame::Event {
1209 timestamp_ns,
1210 values,
1211 ..
1212 } => Some((timestamp_ns, values)),
1213 _ => None,
1214 })
1215 .expect("new trace must contain event");
1216 assert_eq!(*event.0, Some(2_000));
1217 assert_eq!(event.1[0], FieldValue::Varint(99));
1218 }
1219
1220 #[test]
1221 fn into_raw_encoder_preserves_byte_count() {
1222 let mut enc = Encoder::new();
1223 let schema = enc
1224 .register_schema(
1225 "Ev",
1226 vec![FieldDef {
1227 name: "v".into(),
1228 field_type: FieldType::Varint,
1229 }],
1230 )
1231 .unwrap();
1232 enc.write_event(
1233 &schema,
1234 &[FieldValue::Varint(1_000), FieldValue::Varint(42)],
1235 )
1236 .unwrap();
1237
1238 let bytes_before = enc.bytes_written();
1239 assert!(bytes_before > 0);
1240
1241 let raw = enc.into_raw_encoder();
1242 assert_eq!(
1243 raw.bytes_written(),
1244 bytes_before,
1245 "byte count must be preserved across conversion"
1246 );
1247 }
1248
1249 #[test]
1250 fn raw_encoder_write_raw_and_bytes_written() {
1251 let enc = Encoder::new();
1252 let initial = enc.bytes_written();
1253 let mut raw = enc.into_raw_encoder();
1254
1255 let payload = [0xAA; 100];
1256 raw.write_raw(&payload).unwrap();
1257
1258 assert_eq!(
1259 raw.bytes_written(),
1260 initial + payload.len() as u64,
1261 "bytes_written must include raw payload"
1262 );
1263 }
1264
1265 #[test]
1266 fn raw_encoder_into_inner_returns_all_data() {
1267 use crate::decoder::{DecodedFrame, Decoder};
1268
1269 let mut enc = Encoder::new();
1272 let schema = enc
1273 .register_schema(
1274 "Ev",
1275 vec![FieldDef {
1276 name: "v".into(),
1277 field_type: FieldType::Varint,
1278 }],
1279 )
1280 .unwrap();
1281 enc.write_event(&schema, &[FieldValue::Varint(1_000), FieldValue::Varint(1)])
1282 .unwrap();
1283
1284 let raw_batch = {
1286 let mut batch_enc = Encoder::new();
1287 batch_enc
1288 .write_event(&schema, &[FieldValue::Varint(2_000), FieldValue::Varint(2)])
1289 .unwrap();
1290 batch_enc.finish()
1291 };
1292
1293 let mut raw = enc.into_raw_encoder();
1294 raw.write_raw(&raw_batch).unwrap();
1295 let combined = raw.into_inner();
1296
1297 let mut dec = Decoder::new(&combined).unwrap();
1298 let events: Vec<_> = dec
1299 .decode_all()
1300 .into_iter()
1301 .filter_map(|f| match f {
1302 DecodedFrame::Event {
1303 timestamp_ns,
1304 values,
1305 ..
1306 } => Some((timestamp_ns, values)),
1307 _ => None,
1308 })
1309 .collect();
1310
1311 assert_eq!(events.len(), 2);
1312 assert_eq!(events[0].0, Some(1_000));
1313 assert_eq!(events[0].1, vec![FieldValue::Varint(1)]);
1314 assert_eq!(events[1].0, Some(2_000));
1315 assert_eq!(events[1].1, vec![FieldValue::Varint(2)]);
1316 }
1317}