1use crate::TraceEvent;
8use crate::codec::{self, PoolEntry, StackPoolEntry, WireTypeId};
9use crate::schema::{SchemaEntry, SchemaRegistry};
10use crate::types::{
11 CountingWriter, EncodeState, EventEncoder, InternedStackFrames, InternedString,
12};
13use std::any::TypeId;
14use std::collections::HashMap;
15use std::hash::{BuildHasherDefault, Hasher};
16use std::io::{self, Write};
17use std::sync::Arc;
18
19#[doc(hidden)]
24#[derive(Default)]
25pub struct FxHasher(u64);
26
27impl FxHasher {
28 #[inline]
29 fn hash_word(&mut self, word: u64) {
30 self.0 = (self.0.rotate_left(5) ^ word).wrapping_mul(0x517cc1b727220a95);
31 }
32}
33
34impl Hasher for FxHasher {
35 #[inline]
36 fn write(&mut self, mut bytes: &[u8]) {
37 while bytes.len() >= 8 {
38 self.hash_word(u64::from_ne_bytes(bytes[..8].try_into().unwrap()));
39 bytes = &bytes[8..];
40 }
41 if bytes.len() >= 4 {
42 self.hash_word(u32::from_ne_bytes(bytes[..4].try_into().unwrap()) as u64);
43 bytes = &bytes[4..];
44 }
45 for &b in bytes {
46 self.hash_word(b as u64);
47 }
48 }
49
50 #[inline]
51 fn write_u8(&mut self, i: u8) {
52 self.hash_word(i as u64);
53 }
54
55 #[inline]
56 fn write_u16(&mut self, i: u16) {
57 self.hash_word(i as u64);
58 }
59
60 #[inline]
61 fn write_u32(&mut self, i: u32) {
62 self.hash_word(i as u64);
63 }
64
65 #[inline]
66 fn write_u64(&mut self, i: u64) {
67 self.hash_word(i);
68 }
69
70 #[inline]
71 fn write_usize(&mut self, i: usize) {
72 self.hash_word(i as u64);
73 }
74
75 #[inline]
76 fn write_u128(&mut self, i: u128) {
77 self.hash_word(i as u64);
78 self.hash_word((i >> 64) as u64);
79 }
80
81 #[inline]
82 fn finish(&self) -> u64 {
83 self.0
84 }
85}
86
87#[doc(hidden)]
88pub type FxBuildHasher = BuildHasherDefault<FxHasher>;
89#[doc(hidden)]
90pub type FxHashMap<K, V> = HashMap<K, V, FxBuildHasher>;
91
92#[derive(Clone, Debug)]
101pub struct Schema {
102 pub(crate) entry: Arc<SchemaEntry>,
103 name_key: Arc<str>,
106}
107
108impl Schema {
109 pub fn new(name: &str, fields: Vec<crate::schema::FieldDef>) -> Self {
114 let name_key: Arc<str> = Arc::from(name);
115 Self {
116 entry: Arc::new(SchemaEntry {
117 name: name.to_string(),
118 has_timestamp: true,
119 fields,
120 annotations: Vec::new(),
121 }),
122 name_key,
123 }
124 }
125
126 pub fn from_entry(entry: SchemaEntry) -> Self {
128 let name_key: Arc<str> = Arc::from(entry.name.as_str());
129 Self {
130 entry: Arc::new(entry),
131 name_key,
132 }
133 }
134
135 pub fn name(&self) -> &str {
137 &self.entry.name
138 }
139
140 pub fn fields(&self) -> &[crate::schema::FieldDef] {
142 &self.entry.fields
143 }
144}
145
146#[derive(Clone, PartialEq, Eq, Hash)]
149enum SchemaKey {
150 Name(Arc<str>),
151 RustType(TypeId),
152}
153
154pub struct Encoder<W: Write = Vec<u8>> {
162 state: EncodeState<W>,
163 registry: SchemaRegistry,
164 string_pool: FxHashMap<String, u32>,
165 next_pool_id: u32,
166 stack_pool: FxHashMap<Box<[u64]>, u32>,
167 next_stack_pool_id: u32,
168 schema_ids: FxHashMap<SchemaKey, WireTypeId>,
169}
170
171impl Default for Encoder<Vec<u8>> {
172 fn default() -> Self {
173 Self::new()
174 }
175}
176
177impl Encoder<Vec<u8>> {
178 pub fn new() -> Self {
179 let mut buf = Vec::new();
180 codec::encode_header(&mut buf).expect("Vec::write_all cannot fail");
181 Self {
182 state: EncodeState::new(buf),
183 registry: SchemaRegistry::new(),
184 string_pool: FxHashMap::default(),
185 next_pool_id: 0,
186 stack_pool: FxHashMap::default(),
187 next_stack_pool_id: 0,
188 schema_ids: FxHashMap::default(),
189 }
190 }
191
192 pub fn finish(self) -> Vec<u8> {
194 self.state.writer.into_inner()
195 }
196}
197
198impl<W: Write> Encoder<W> {
199 pub fn new_to(mut writer: W) -> io::Result<Self> {
202 codec::encode_header(&mut writer)?;
203 Ok(Self {
204 state: EncodeState::new(writer),
205 registry: SchemaRegistry::new(),
206 string_pool: FxHashMap::default(),
207 next_pool_id: 0,
208 stack_pool: FxHashMap::default(),
209 next_stack_pool_id: 0,
210 schema_ids: FxHashMap::default(),
211 })
212 }
213
214 pub(crate) fn from_decoder(
217 mut registry: SchemaRegistry,
218 string_pool: crate::decoder::StringPool,
219 stack_pool: crate::decoder::StackPool,
220 timestamp_base_ns: u64,
221 writer: W,
222 ) -> Self {
223 let mut pool = FxHashMap::default();
224 let mut next_pool_id: u32 = 0;
225 for (id, value) in string_pool.0.into_iter() {
226 pool.insert(value, id.raw_id());
227 if id.raw_id() >= next_pool_id {
228 next_pool_id = id.raw_id() + 1;
229 }
230 }
231
232 let mut new_stack_pool: FxHashMap<Box<[u64]>, u32> = FxHashMap::default();
233 let mut next_stack_pool_id: u32 = 0;
234 for (id, frames) in stack_pool.0.into_iter() {
235 new_stack_pool.insert(frames.into_boxed_slice(), id.raw_id());
236 if id.raw_id() >= next_stack_pool_id {
237 next_stack_pool_id = id.raw_id() + 1;
238 }
239 }
240
241 let mut schema_ids = FxHashMap::default();
242 for (wire_id, entry) in registry.entries() {
243 schema_ids.insert(SchemaKey::Name(Arc::from(entry.name.as_str())), wire_id);
244 }
245 registry.sync_next_id();
246
247 let mut state = EncodeState::new(writer);
248 state.set_ts_base_unchecked(timestamp_base_ns);
249
250 Self {
251 state,
252 registry,
253 string_pool: pool,
254 next_pool_id,
255 stack_pool: new_stack_pool,
256 next_stack_pool_id,
257 schema_ids,
258 }
259 }
260
261 pub fn into_inner(self) -> W {
263 self.state.writer.into_inner()
264 }
265
266 pub fn as_inner(&self) -> &W {
268 self.state.writer.inner()
269 }
270
271 pub fn bytes_written(&self) -> u64 {
273 self.state.writer.bytes_written()
274 }
275
276 pub fn reset_to(&mut self, mut new_writer: W) -> io::Result<W> {
279 codec::encode_header(&mut new_writer)?;
280 self.string_pool.clear();
281 self.next_pool_id = 0;
282 self.stack_pool.clear();
283 self.next_stack_pool_id = 0;
284 self.registry.clear();
285 self.schema_ids.clear();
286 let old_state = std::mem::replace(&mut self.state, EncodeState::new(new_writer));
288 Ok(old_state.writer.into_inner())
289 }
290
291 fn ensure_registered(&mut self, schema: &Schema) -> io::Result<WireTypeId> {
297 let key = SchemaKey::Name(Arc::clone(&schema.name_key));
298 if let Some(&wire_id) = self.schema_ids.get(&key) {
299 let Some(existing) = self.registry.get(wire_id) else {
301 return Err(io::Error::other(format!(
302 "corrupted internal state. {wire_id:?} in schema_ids but not in registry."
303 )));
304 };
305 if *existing == *schema.entry {
306 return Ok(wire_id);
307 }
308 return Err(io::Error::new(
309 io::ErrorKind::InvalidInput,
310 format!(
311 "schema already registered with different definition: {}",
312 schema.name()
313 ),
314 ));
315 }
316 let id = self.registry.next_type_id();
317 codec::encode_schema(id, &schema.entry, &mut self.state.writer)?;
318 if !schema.entry.annotations.is_empty() {
319 codec::encode_schema_annotations(
320 id,
321 &schema.entry.annotations,
322 &mut self.state.writer,
323 )?;
324 }
325 self.registry
326 .register(id, (*schema.entry).clone())
327 .expect("schema registration failed");
328 self.schema_ids.insert(key, id);
329 Ok(id)
330 }
331
332 pub fn register_schema(
342 &mut self,
343 name: &str,
344 fields: Vec<crate::schema::FieldDef>,
345 ) -> io::Result<Schema> {
346 let schema = Schema::new(name, fields);
347 self.ensure_registered(&schema)?;
348 Ok(schema)
349 }
350
351 pub fn register_existing(&mut self, schema: &Schema) -> io::Result<WireTypeId> {
356 self.ensure_registered(schema)
357 }
358
359 pub fn write_event(
368 &mut self,
369 schema: &Schema,
370 values: &[crate::types::FieldValue],
371 ) -> io::Result<()> {
372 use crate::types::FieldValue;
373
374 let type_id = self.ensure_registered(schema)?;
375 let expected_fields = schema.entry.fields.len();
376
377 let ts_ns = match values.first() {
378 Some(FieldValue::Varint(ns)) => *ns,
379 _ => {
380 return Err(io::Error::new(
381 io::ErrorKind::InvalidInput,
382 "first value must be FieldValue::Varint(timestamp_ns)",
383 ));
384 }
385 };
386 let field_values = &values[1..];
387
388 if field_values.len() != expected_fields {
389 return Err(io::Error::new(
390 io::ErrorKind::InvalidInput,
391 format!(
392 "value count ({}) does not match schema field count ({}) for schema '{}'",
393 field_values.len(),
394 expected_fields,
395 schema.name(),
396 ),
397 ));
398 }
399
400 let ts_delta = self.state.encode_timestamp_delta(ts_ns)?;
401 self.state.writer.write_all(&[codec::TAG_EVENT])?;
402 self.state.writer.write_all(&type_id.0.to_le_bytes())?;
403 codec::encode_u24_le(ts_delta, &mut self.state.writer)?;
404 let mut enc = EventEncoder::new(&mut self.state);
405 for (i, v) in field_values.iter().enumerate() {
406 enc.write_field_value(v, schema.entry.fields[i].field_type)?;
407 }
408 Ok(())
409 }
410
411 pub fn write<T: TraceEvent + 'static>(&mut self, event: &T) -> io::Result<()> {
414 let key = SchemaKey::RustType(TypeId::of::<T>());
415 let tid = if let Some(&cached) = self.schema_ids.get(&key) {
416 cached
417 } else {
418 let entry = T::schema_entry();
419 let schema = Schema::new(&entry.name, entry.fields);
420 let id = self.ensure_registered(&schema)?;
421 self.schema_ids.insert(key, id);
422 id
423 };
424 let ts_ns = event.timestamp();
425 let ts_delta = self.state.encode_timestamp_delta(ts_ns)?;
426 self.state.writer.write_all(&[codec::TAG_EVENT])?;
427 self.state.writer.write_all(&tid.0.to_le_bytes())?;
428 codec::encode_u24_le(ts_delta, &mut self.state.writer)?;
429 let mut enc = EventEncoder::new(&mut self.state);
430 event.encode_fields(&mut enc)
431 }
432
433 pub fn intern_string(&mut self, s: &str) -> io::Result<InternedString> {
435 if let Some(&id) = self.string_pool.get(s) {
436 return Ok(InternedString(id));
437 }
438 let id = self.next_pool_id;
439 self.next_pool_id += 1;
440 self.string_pool.insert(s.to_string(), id);
441 codec::encode_string_pool(
442 &[PoolEntry {
443 pool_id: id,
444 data: s.as_bytes().to_vec(),
445 }],
446 &mut self.state.writer,
447 )?;
448 Ok(InternedString(id))
449 }
450
451 pub fn write_string_pool(&mut self, entries: &[PoolEntry]) -> io::Result<()> {
452 codec::encode_string_pool(entries, &mut self.state.writer)
453 }
454
455 pub fn intern_stack_frames(&mut self, frames: &[u64]) -> io::Result<InternedStackFrames> {
458 if let Some(&id) = self.stack_pool.get(frames) {
459 return Ok(InternedStackFrames(id));
460 }
461 let id = self.next_stack_pool_id;
462 self.next_stack_pool_id += 1;
463 self.stack_pool.insert(frames.into(), id);
464 codec::encode_stack_pool(
465 &[StackPoolEntry {
466 pool_id: id,
467 frames: frames.to_vec(),
470 }],
471 &mut self.state.writer,
472 )?;
473 Ok(InternedStackFrames(id))
474 }
475
476 pub fn write_stack_pool(&mut self, entries: &[StackPoolEntry]) -> io::Result<()> {
477 codec::encode_stack_pool(entries, &mut self.state.writer)
478 }
479
480 pub fn flush(&mut self) -> io::Result<()> {
482 self.state.writer.flush()
483 }
484
485 pub fn into_raw_encoder(self) -> RawEncoder<W> {
492 RawEncoder {
493 writer: self.state.writer,
494 }
495 }
496}
497
498pub struct RawEncoder<W> {
505 writer: CountingWriter<W>,
506}
507
508impl<W: Write> RawEncoder<W> {
509 pub fn write_raw(&mut self, bytes: &[u8]) -> io::Result<()> {
511 self.writer.write_all(bytes)
512 }
513
514 pub fn bytes_written(&self) -> u64 {
517 self.writer.bytes_written()
518 }
519
520 pub fn flush(&mut self) -> io::Result<()> {
522 self.writer.flush()
523 }
524
525 pub fn into_inner(self) -> W {
527 self.writer.into_inner()
528 }
529}
530
531impl Encoder<Vec<u8>> {
532 pub fn write_infallible<T: TraceEvent + 'static>(&mut self, event: &T) {
533 self.write(event).expect("writing to Vec<u8> is infallible")
534 }
535
536 pub fn intern_string_infallible(&mut self, s: &str) -> InternedString {
537 self.intern_string(s)
538 .expect("interning into Vec<u8> is infallible")
539 }
540
541 pub fn intern_stack_frames_infallible(&mut self, frames: &[u64]) -> InternedStackFrames {
542 self.intern_stack_frames(frames)
543 .expect("interning into Vec<u8> is infallible")
544 }
545
546 pub fn reset_to_infallible(&mut self, data: Vec<u8>) -> Vec<u8> {
548 self.reset_to(data)
549 .expect("writing to Vec<u8> is infallible")
550 }
551}
552
553#[cfg(test)]
554mod tests {
555 use super::*;
556 use crate::schema::FieldDef;
557 use crate::types::{FieldType, FieldValue};
558
559 #[test]
560 fn encoder_writes_header() {
561 let enc = Encoder::new();
562 let data = enc.finish();
563 assert_eq!(&data[..5], &[0x54, 0x52, 0x43, 0x00, 1]);
564 }
565
566 #[test]
567 fn encoder_register_and_write_event() {
568 let mut enc = Encoder::new();
569 let schema = enc
570 .register_schema(
571 "Ev",
572 vec![FieldDef {
573 name: "v".into(),
574 field_type: FieldType::Varint,
575 }],
576 )
577 .unwrap();
578 enc.write_event(
579 &schema,
580 &[FieldValue::Varint(1_000), FieldValue::Varint(42)],
581 )
582 .unwrap();
583 let data = enc.finish();
584 assert!(data.len() > 5);
585 }
586
587 #[test]
588 fn idempotent_re_registration() {
589 let mut enc = Encoder::new();
590 let fields = vec![FieldDef {
591 name: "v".into(),
592 field_type: FieldType::Varint,
593 }];
594 let _s1 = enc.register_schema("Ev", fields.clone()).unwrap();
595 let _s2 = enc.register_schema("Ev", fields).unwrap();
596 }
598
599 #[test]
600 fn re_registration_different_schema_errors() {
601 let mut enc = Encoder::new();
602 enc.register_schema(
603 "Ev",
604 vec![FieldDef {
605 name: "v".into(),
606 field_type: FieldType::Varint,
607 }],
608 )
609 .unwrap();
610 let result = enc.register_schema(
611 "Ev",
612 vec![FieldDef {
613 name: "different".into(),
614 field_type: FieldType::Bool,
615 }],
616 );
617 assert!(result.is_err());
618 }
619
620 #[test]
621 fn schema_auto_registers_on_write() {
622 use crate::decoder::{DecodedFrame, Decoder};
623
624 let schema = Schema::new(
626 "Lazy",
627 vec![FieldDef {
628 name: "v".into(),
629 field_type: FieldType::Varint,
630 }],
631 );
632
633 let mut enc = Encoder::new();
635 enc.write_event(
636 &schema,
637 &[FieldValue::Varint(1_000), FieldValue::Varint(42)],
638 )
639 .unwrap();
640
641 let bytes = enc.finish();
642 let mut dec = Decoder::new(&bytes).unwrap();
643 let frames = dec.decode_all();
644 assert!(matches!(&frames[0], DecodedFrame::Schema(s) if s.name == "Lazy"));
645 if let DecodedFrame::Event { values, .. } = &frames[1] {
646 assert_eq!(*values, vec![FieldValue::Varint(42)]);
647 } else {
648 panic!("expected event");
649 }
650 }
651
652 #[test]
653 fn schema_portable_across_encoders() {
654 use crate::decoder::{DecodedFrame, Decoder};
655
656 let mut enc1 = Encoder::new();
657 let schema = enc1
658 .register_schema(
659 "Shared",
660 vec![FieldDef {
661 name: "v".into(),
662 field_type: FieldType::Varint,
663 }],
664 )
665 .unwrap();
666 enc1.write_event(&schema, &[FieldValue::Varint(1_000), FieldValue::Varint(1)])
667 .unwrap();
668
669 let mut enc2 = Encoder::new();
671 enc2.write_event(&schema, &[FieldValue::Varint(2_000), FieldValue::Varint(2)])
672 .unwrap();
673
674 for (enc, expected_val) in [(enc1, 1u64), (enc2, 2u64)] {
676 let bytes = enc.finish();
677 let mut dec = Decoder::new(&bytes).unwrap();
678 let frames = dec.decode_all();
679 let event = frames
680 .iter()
681 .find(|f| matches!(f, DecodedFrame::Event { .. }))
682 .unwrap();
683 if let DecodedFrame::Event { values, .. } = event {
684 assert_eq!(values[0], FieldValue::Varint(expected_val));
685 }
686 }
687 }
688
689 #[test]
690 fn encoder_intern_string_deduplicates() {
691 let mut enc = Encoder::new();
692 let id1 = enc.intern_string("hello").unwrap();
693 let id2 = enc.intern_string("hello").unwrap();
694 let id3 = enc.intern_string("world").unwrap();
695 assert_eq!(id1, id2);
696 assert_ne!(id1, id3);
697 }
698
699 #[test]
700 fn encoder_intern_stack_frames_deduplicates() {
701 let mut enc = Encoder::new();
702 let stack_a: &[u64] = &[0x1000, 0x2000, 0x3000];
703 let stack_b: &[u64] = &[0x4000, 0x5000];
704 let id1 = enc.intern_stack_frames(stack_a).unwrap();
705 let id2 = enc.intern_stack_frames(stack_a).unwrap();
706 let id3 = enc.intern_stack_frames(stack_b).unwrap();
707 assert_eq!(id1, id2);
708 assert_ne!(id1, id3);
709 }
710
711 #[test]
712 fn stack_pool_round_trip_via_decoder() {
713 use crate::decoder::Decoder;
714 use crate::types::InternedStackFrames;
715
716 let mut enc = Encoder::new();
717 let stack_a: &[u64] = &[0xdead, 0xbeef, 0xcafe];
718 let stack_b: &[u64] = &[0x1, 0x2];
719 let id_a = enc.intern_stack_frames(stack_a).unwrap();
720 let id_b = enc.intern_stack_frames(stack_b).unwrap();
721 let bytes = enc.finish();
722
723 let mut dec = Decoder::new(&bytes).unwrap();
724 let _ = dec.decode_all();
725 assert_eq!(
726 dec.stack_pool().get(InternedStackFrames(id_a.raw_id())),
727 Some(stack_a)
728 );
729 assert_eq!(
730 dec.stack_pool().get(InternedStackFrames(id_b.raw_id())),
731 Some(stack_b)
732 );
733 }
734
735 #[test]
736 fn for_each_event_populates_stack_pool() {
737 use crate::decoder::Decoder;
738 use crate::schema::FieldDef;
739 use crate::types::{FieldType, FieldValue, InternedStackFrames};
740
741 let mut enc = Encoder::new();
742 let schema = enc
743 .register_schema(
744 "CpuSampleEvent",
745 vec![FieldDef {
746 name: "callchain".into(),
747 field_type: FieldType::PooledStackFrames,
748 }],
749 )
750 .unwrap();
751 let stack: &[u64] = &[0x1234, 0x5678, 0x9abc];
752 let id = enc.intern_stack_frames(stack).unwrap();
753 enc.write_event(
754 &schema,
755 &[
756 FieldValue::Varint(1_000_000),
757 FieldValue::PooledStackFrames(id),
758 ],
759 )
760 .unwrap();
761 let bytes = enc.finish();
762
763 let mut dec = Decoder::new(&bytes).unwrap();
764 let mut event_count = 0;
765 dec.for_each_event(|_ev| {
766 event_count += 1;
767 })
768 .unwrap();
769 assert_eq!(event_count, 1);
770 assert_eq!(
771 dec.stack_pool().get(InternedStackFrames(id.raw_id())),
772 Some(stack),
773 );
774 }
775
776 #[test]
777 fn encoder_intern_empty_stack_frames() {
778 use crate::decoder::Decoder;
779 use crate::types::InternedStackFrames;
780
781 let mut enc = Encoder::new();
782 let id1 = enc.intern_stack_frames(&[]).unwrap();
783 let id2 = enc.intern_stack_frames(&[]).unwrap();
784 assert_eq!(id1, id2);
785 let bytes = enc.finish();
786
787 let mut dec = Decoder::new(&bytes).unwrap();
788 let _ = dec.decode_all();
789 assert_eq!(
790 dec.stack_pool().get(InternedStackFrames(id1.raw_id())),
791 Some(&[][..])
792 );
793 }
794
795 #[test]
796 fn write_stack_pool_multi_entry_round_trip() {
797 use crate::decoder::Decoder;
798 use crate::types::InternedStackFrames;
799
800 let mut enc = Encoder::new();
801 let entries = vec![
802 StackPoolEntry {
803 pool_id: 0,
804 frames: vec![0xaaaa, 0xbbbb, 0xcccc],
805 },
806 StackPoolEntry {
807 pool_id: 1,
808 frames: vec![0x1111],
809 },
810 StackPoolEntry {
811 pool_id: 2,
812 frames: vec![],
813 },
814 ];
815 enc.write_stack_pool(&entries).unwrap();
816 let bytes = enc.finish();
817
818 let mut dec = Decoder::new(&bytes).unwrap();
819 let _ = dec.decode_all();
820 assert_eq!(
821 dec.stack_pool().get(InternedStackFrames(0)),
822 Some(&[0xaaaa, 0xbbbb, 0xcccc][..])
823 );
824 assert_eq!(
825 dec.stack_pool().get(InternedStackFrames(1)),
826 Some(&[0x1111][..])
827 );
828 assert_eq!(dec.stack_pool().get(InternedStackFrames(2)), Some(&[][..]));
829 }
830
831 #[test]
832 fn decoder_into_encoder_deduplicates_interned_stack_frames() {
833 use crate::decoder::Decoder;
834
835 let mut enc = Encoder::new();
836 let id1 = enc.intern_stack_frames(&[0x10, 0x20]).unwrap();
837 let base = enc.finish();
838
839 let mut decoder = Decoder::new(&base).unwrap();
840 while decoder.next_frame_ref().ok().flatten().is_some() {}
841 let mut output = Vec::new();
842 let mut ext = decoder.into_encoder(&mut output);
843 let id2 = ext.intern_stack_frames(&[0x10, 0x20]).unwrap();
844 let id3 = ext.intern_stack_frames(&[0x30]).unwrap();
845 assert_eq!(id1.raw_id(), id2.raw_id());
846 assert_ne!(id2.raw_id(), id3.raw_id());
847 }
848
849 #[test]
850 fn timestamp_round_trip() {
851 use crate::decoder::{DecodedFrame, Decoder};
852
853 let mut enc = Encoder::new();
854 let schema = enc
855 .register_schema(
856 "TS",
857 vec![FieldDef {
858 name: "v".into(),
859 field_type: FieldType::Varint,
860 }],
861 )
862 .unwrap();
863
864 let ts1 = 100_000u64;
865 let ts2 = 50_000u64;
866 let ts3 = 200_000_000u64;
867 let ts4 = 100_000_000u64;
868 enc.write_event(&schema, &[FieldValue::Varint(ts1), FieldValue::Varint(1)])
869 .unwrap();
870 enc.write_event(&schema, &[FieldValue::Varint(ts2), FieldValue::Varint(2)])
871 .unwrap();
872 enc.write_event(&schema, &[FieldValue::Varint(ts3), FieldValue::Varint(3)])
873 .unwrap();
874 enc.write_event(&schema, &[FieldValue::Varint(ts4), FieldValue::Varint(4)])
875 .unwrap();
876
877 let bytes = enc.finish();
878 let mut dec = Decoder::new(&bytes).unwrap();
879 let events: Vec<_> = dec
880 .decode_all()
881 .into_iter()
882 .filter_map(|f| match f {
883 DecodedFrame::Event {
884 timestamp_ns,
885 values,
886 ..
887 } => Some((timestamp_ns, values)),
888 _ => None,
889 })
890 .collect();
891
892 assert_eq!(events.len(), 4);
893 assert_eq!(events[0].0, Some(ts1));
894 assert_eq!(events[0].1, vec![FieldValue::Varint(1)]);
895 assert_eq!(events[1].0, Some(ts2));
896 assert_eq!(events[1].1, vec![FieldValue::Varint(2)]);
897 assert_eq!(events[2].0, Some(ts3));
898 assert_eq!(events[2].1, vec![FieldValue::Varint(3)]);
899 assert_eq!(events[3].0, Some(ts4));
900 assert_eq!(events[3].1, vec![FieldValue::Varint(4)]);
901 }
902
903 #[test]
904 fn encoder_new_to_writer() {
905 let mut buf = Vec::new();
906 let enc = Encoder::new_to(&mut buf).unwrap();
907 drop(enc);
908 assert!(buf.len() >= 5);
909 assert_eq!(&buf[..5], &[0x54, 0x52, 0x43, 0x00, 1]);
910 }
911
912 #[test]
913 fn decoder_into_encoder_appends_without_header() {
914 use crate::decoder::{DecodedFrame, Decoder};
915
916 let mut enc = Encoder::new();
918 let schema = enc
919 .register_schema(
920 "Ev",
921 vec![FieldDef {
922 name: "v".into(),
923 field_type: FieldType::Varint,
924 }],
925 )
926 .unwrap();
927 enc.write_event(&schema, &[FieldValue::Varint(1_000), FieldValue::Varint(1)])
928 .unwrap();
929 let base = enc.finish();
930
931 let mut decoder = Decoder::new(&base).unwrap();
933 while decoder.next_frame_ref().ok().flatten().is_some() {}
934 let mut output = Vec::new();
935 let mut ext = decoder.into_encoder(&mut output);
936 ext.write_event(&schema, &[FieldValue::Varint(2_000), FieldValue::Varint(2)])
938 .unwrap();
939 drop(ext);
940
941 let mut combined = base.clone();
943 combined.extend_from_slice(&output);
944 let mut dec = Decoder::new(&combined).unwrap();
945 let events: Vec<_> = dec
946 .decode_all()
947 .into_iter()
948 .filter_map(|f| match f {
949 DecodedFrame::Event {
950 timestamp_ns,
951 values,
952 ..
953 } => Some((timestamp_ns, values)),
954 _ => None,
955 })
956 .collect();
957 assert_eq!(events.len(), 2);
958 assert_eq!(events[0].0, Some(1_000));
959 assert_eq!(events[1].0, Some(2_000));
960 }
961
962 #[test]
963 fn decoder_into_encoder_deduplicates_interned_strings() {
964 use crate::decoder::{DecodedFrame, Decoder};
965
966 let mut enc = Encoder::new();
968 let id1 = enc.intern_string("hello").unwrap();
969 let base = enc.finish();
970
971 let mut decoder = Decoder::new(&base).unwrap();
973 while decoder.next_frame_ref().ok().flatten().is_some() {}
974 let mut output = Vec::new();
975 let mut ext = decoder.into_encoder(&mut output);
976 let id2 = ext.intern_string("hello").unwrap();
978 let id3 = ext.intern_string("world").unwrap();
979 drop(ext);
980
981 assert_eq!(id1, id2, "existing string should reuse pool ID");
982 assert_ne!(id2, id3);
983
984 let mut combined = base.clone();
986 combined.extend_from_slice(&output);
987 let mut dec = Decoder::new(&combined).unwrap();
988 let frames = dec.decode_all();
989 let pool_frames: Vec<_> = frames
990 .iter()
991 .filter(|f| matches!(f, DecodedFrame::StringPool(_)))
992 .collect();
993 assert_eq!(pool_frames.len(), 2);
995 }
996
997 #[test]
998 fn register_and_write() {
999 use crate::decoder::{DecodedFrame, Decoder};
1000
1001 let mut enc = Encoder::new();
1002 let schema = enc
1003 .register_schema(
1004 "MyEvent",
1005 vec![
1006 FieldDef {
1007 name: "count".into(),
1008 field_type: FieldType::Varint,
1009 },
1010 FieldDef {
1011 name: "name".into(),
1012 field_type: FieldType::String,
1013 },
1014 ],
1015 )
1016 .unwrap();
1017
1018 enc.write_event(
1019 &schema,
1020 &[
1021 FieldValue::Varint(1_000_000),
1022 FieldValue::Varint(42),
1023 FieldValue::String("hello".into()),
1024 ],
1025 )
1026 .unwrap();
1027
1028 let bytes = enc.finish();
1029 let mut dec = Decoder::new(&bytes).unwrap();
1030 let frames = dec.decode_all();
1031 let events: Vec<_> = frames
1032 .into_iter()
1033 .filter_map(|f| match f {
1034 DecodedFrame::Event {
1035 timestamp_ns,
1036 values,
1037 ..
1038 } => Some((timestamp_ns, values)),
1039 _ => None,
1040 })
1041 .collect();
1042 assert_eq!(events.len(), 1);
1043 assert_eq!(events[0].0, Some(1_000_000));
1044 assert_eq!(events[0].1[0], FieldValue::Varint(42));
1045 assert_eq!(events[0].1[1], FieldValue::String("hello".into()));
1046 }
1047
1048 #[test]
1049 fn register_conflict_errors() {
1050 let mut enc = Encoder::new();
1051 enc.register_schema(
1052 "Ev",
1053 vec![FieldDef {
1054 name: "v".into(),
1055 field_type: FieldType::Varint,
1056 }],
1057 )
1058 .unwrap();
1059 let result = enc.register_schema(
1060 "Ev",
1061 vec![FieldDef {
1062 name: "other".into(),
1063 field_type: FieldType::Bool,
1064 }],
1065 );
1066 assert!(result.is_err());
1067 }
1068
1069 #[test]
1070 fn write_wrong_field_count_errors() {
1071 let mut enc = Encoder::new();
1072 let schema = enc
1073 .register_schema(
1074 "Ev",
1075 vec![FieldDef {
1076 name: "v".into(),
1077 field_type: FieldType::Varint,
1078 }],
1079 )
1080 .unwrap();
1081 let result = enc.write_event(
1083 &schema,
1084 &[
1085 FieldValue::Varint(0),
1086 FieldValue::Varint(1),
1087 FieldValue::Varint(2),
1088 ],
1089 );
1090 assert!(result.is_err());
1091 }
1092
1093 #[test]
1096 fn timestamp_base_advances_per_event() {
1097 use crate::decoder::{DecodedFrame, Decoder};
1098
1099 let mut enc = Encoder::new();
1100 let schema = enc
1101 .register_schema(
1102 "Ev",
1103 vec![FieldDef {
1104 name: "v".into(),
1105 field_type: FieldType::Varint,
1106 }],
1107 )
1108 .unwrap();
1109
1110 let ts1 = 12_000_000u64;
1111 let ts2 = 24_000_000u64;
1112 enc.write_event(&schema, &[FieldValue::Varint(ts1), FieldValue::Varint(1)])
1113 .unwrap();
1114 enc.write_event(&schema, &[FieldValue::Varint(ts2), FieldValue::Varint(2)])
1115 .unwrap();
1116
1117 let bytes = enc.finish();
1118
1119 let reset_count = bytes.iter().filter(|&&b| b == 0x05).count();
1120 assert_eq!(
1121 reset_count, 0,
1122 "base should advance per event, avoiding unnecessary resets"
1123 );
1124
1125 let mut dec = Decoder::new(&bytes).unwrap();
1126 let events: Vec<_> = dec
1127 .decode_all()
1128 .into_iter()
1129 .filter_map(|f| match f {
1130 DecodedFrame::Event { timestamp_ns, .. } => timestamp_ns,
1131 _ => None,
1132 })
1133 .collect();
1134 assert_eq!(events, vec![ts1, ts2]);
1135 }
1136
1137 #[test]
1138 fn reset_to_preserves_capacity() {
1139 let mut enc = Encoder::new();
1140 for i in 0..100 {
1141 enc.intern_string(&format!("string_{}", i)).unwrap();
1142 }
1143 let cap_before = enc.string_pool.capacity();
1144 let _bytes = enc.reset_to(Vec::new());
1145 let cap_after = enc.string_pool.capacity();
1146 assert_eq!(
1147 cap_before, cap_after,
1148 "string_pool capacity should be preserved after reset_to"
1149 );
1150 }
1151
1152 #[test]
1153 fn reset_to_returns_old_data_and_clears_state() {
1154 use crate::decoder::{DecodedFrame, Decoder};
1155
1156 let mut enc = Encoder::new();
1157 let schema = enc
1158 .register_schema(
1159 "Ev",
1160 vec![FieldDef {
1161 name: "v".into(),
1162 field_type: FieldType::Varint,
1163 }],
1164 )
1165 .unwrap();
1166 enc.write_event(
1167 &schema,
1168 &[FieldValue::Varint(1_000), FieldValue::Varint(42)],
1169 )
1170 .unwrap();
1171 let _s = enc.intern_string("hello").unwrap();
1172
1173 let old_bytes_written = enc.bytes_written();
1174 assert!(old_bytes_written > 0);
1175
1176 let old = enc.reset_to_infallible(Vec::new());
1178
1179 let mut dec = Decoder::new(&old).unwrap();
1181 let frames = dec.decode_all();
1182 assert!(frames.iter().any(|f| matches!(f, DecodedFrame::Schema(_))));
1183 assert!(
1184 frames
1185 .iter()
1186 .any(|f| matches!(f, DecodedFrame::Event { .. }))
1187 );
1188 assert!(
1189 frames
1190 .iter()
1191 .any(|f| matches!(f, DecodedFrame::StringPool(_)))
1192 );
1193
1194 assert!(
1196 enc.bytes_written() < old_bytes_written,
1197 "bytes_written should reset (got {} vs old {})",
1198 enc.bytes_written(),
1199 old_bytes_written
1200 );
1201
1202 enc.write_event(
1205 &schema,
1206 &[FieldValue::Varint(2_000), FieldValue::Varint(99)],
1207 )
1208 .unwrap();
1209
1210 let _s2 = enc.intern_string("hello").unwrap();
1212
1213 let new_bytes = enc.reset_to_infallible(Vec::new());
1215 let mut dec2 = Decoder::new(&new_bytes).unwrap();
1216 let new_frames = dec2.decode_all();
1217 assert!(
1219 new_frames
1220 .iter()
1221 .any(|f| matches!(f, DecodedFrame::Schema(s) if s.name == "Ev")),
1222 "new trace must contain schema definition"
1223 );
1224 assert!(
1226 new_frames
1227 .iter()
1228 .any(|f| matches!(f, DecodedFrame::StringPool(_))),
1229 "new trace must contain string pool"
1230 );
1231 let event = new_frames
1233 .iter()
1234 .find_map(|f| match f {
1235 DecodedFrame::Event {
1236 timestamp_ns,
1237 values,
1238 ..
1239 } => Some((timestamp_ns, values)),
1240 _ => None,
1241 })
1242 .expect("new trace must contain event");
1243 assert_eq!(*event.0, Some(2_000));
1244 assert_eq!(event.1[0], FieldValue::Varint(99));
1245 }
1246
1247 #[test]
1248 fn into_raw_encoder_preserves_byte_count() {
1249 let mut enc = Encoder::new();
1250 let schema = enc
1251 .register_schema(
1252 "Ev",
1253 vec![FieldDef {
1254 name: "v".into(),
1255 field_type: FieldType::Varint,
1256 }],
1257 )
1258 .unwrap();
1259 enc.write_event(
1260 &schema,
1261 &[FieldValue::Varint(1_000), FieldValue::Varint(42)],
1262 )
1263 .unwrap();
1264
1265 let bytes_before = enc.bytes_written();
1266 assert!(bytes_before > 0);
1267
1268 let raw = enc.into_raw_encoder();
1269 assert_eq!(
1270 raw.bytes_written(),
1271 bytes_before,
1272 "byte count must be preserved across conversion"
1273 );
1274 }
1275
1276 #[test]
1277 fn raw_encoder_write_raw_and_bytes_written() {
1278 let enc = Encoder::new();
1279 let initial = enc.bytes_written();
1280 let mut raw = enc.into_raw_encoder();
1281
1282 let payload = [0xAA; 100];
1283 raw.write_raw(&payload).unwrap();
1284
1285 assert_eq!(
1286 raw.bytes_written(),
1287 initial + payload.len() as u64,
1288 "bytes_written must include raw payload"
1289 );
1290 }
1291
1292 #[test]
1293 fn raw_encoder_into_inner_returns_all_data() {
1294 use crate::decoder::{DecodedFrame, Decoder};
1295
1296 let mut enc = Encoder::new();
1299 let schema = enc
1300 .register_schema(
1301 "Ev",
1302 vec![FieldDef {
1303 name: "v".into(),
1304 field_type: FieldType::Varint,
1305 }],
1306 )
1307 .unwrap();
1308 enc.write_event(&schema, &[FieldValue::Varint(1_000), FieldValue::Varint(1)])
1309 .unwrap();
1310
1311 let raw_batch = {
1313 let mut batch_enc = Encoder::new();
1314 batch_enc
1315 .write_event(&schema, &[FieldValue::Varint(2_000), FieldValue::Varint(2)])
1316 .unwrap();
1317 batch_enc.finish()
1318 };
1319
1320 let mut raw = enc.into_raw_encoder();
1321 raw.write_raw(&raw_batch).unwrap();
1322 let combined = raw.into_inner();
1323
1324 let mut dec = Decoder::new(&combined).unwrap();
1325 let events: Vec<_> = dec
1326 .decode_all()
1327 .into_iter()
1328 .filter_map(|f| match f {
1329 DecodedFrame::Event {
1330 timestamp_ns,
1331 values,
1332 ..
1333 } => Some((timestamp_ns, values)),
1334 _ => None,
1335 })
1336 .collect();
1337
1338 assert_eq!(events.len(), 2);
1339 assert_eq!(events[0].0, Some(1_000));
1340 assert_eq!(events[0].1, vec![FieldValue::Varint(1)]);
1341 assert_eq!(events[1].0, Some(2_000));
1342 assert_eq!(events[1].1, vec![FieldValue::Varint(2)]);
1343 }
1344}