1use crate::TraceEvent;
8use crate::codec::{self, PoolEntry, WireTypeId};
9use crate::schema::{SchemaEntry, SchemaRegistry};
10use crate::types::{CountingWriter, EncodeState, EventEncoder, InternedString};
11use std::any::TypeId;
12use std::collections::HashMap;
13use std::hash::{BuildHasherDefault, Hasher};
14use std::io::{self, Write};
15use std::sync::Arc;
16
17#[derive(Default)]
22pub(crate) struct FxHasher(u64);
23
24impl Hasher for FxHasher {
25 #[inline]
26 fn write(&mut self, bytes: &[u8]) {
27 for &b in bytes {
28 self.0 = (self.0.rotate_left(5) ^ b as u64).wrapping_mul(0x517cc1b727220a95);
29 }
30 }
31
32 #[inline]
33 fn write_u64(&mut self, i: u64) {
34 self.0 = (self.0.rotate_left(5) ^ i).wrapping_mul(0x517cc1b727220a95);
35 }
36
37 #[inline]
38 fn write_u32(&mut self, i: u32) {
39 self.write_u64(i as u64)
40 }
41
42 #[inline]
43 fn write_u16(&mut self, i: u16) {
44 self.write_u64(i as u64)
45 }
46
47 #[inline]
48 fn write_usize(&mut self, i: usize) {
49 self.write_u64(i as u64)
50 }
51
52 #[inline]
53 fn write_u128(&mut self, i: u128) {
54 self.write_u64(i as u64);
55 self.write_u64((i >> 64) as u64);
56 }
57
58 #[inline]
59 fn finish(&self) -> u64 {
60 self.0
61 }
62}
63
64pub(crate) type FxBuildHasher = BuildHasherDefault<FxHasher>;
65pub(crate) type FxHashMap<K, V> = HashMap<K, V, FxBuildHasher>;
66
67#[derive(Clone, Debug)]
76pub struct Schema {
77 pub(crate) entry: Arc<SchemaEntry>,
78 name_key: Arc<str>,
81}
82
83impl Schema {
84 pub fn new(name: &str, fields: Vec<crate::schema::FieldDef>) -> Self {
89 let name_key: Arc<str> = Arc::from(name);
90 Self {
91 entry: Arc::new(SchemaEntry {
92 name: name.to_string(),
93 has_timestamp: true,
94 fields,
95 }),
96 name_key,
97 }
98 }
99
100 pub fn name(&self) -> &str {
102 &self.entry.name
103 }
104
105 pub fn fields(&self) -> &[crate::schema::FieldDef] {
107 &self.entry.fields
108 }
109}
110
111#[derive(Clone, PartialEq, Eq, Hash)]
114enum SchemaKey {
115 Name(Arc<str>),
116 RustType(TypeId),
117}
118
119pub struct Encoder<W: Write = Vec<u8>> {
127 state: EncodeState<W>,
128 registry: SchemaRegistry,
129 string_pool: FxHashMap<String, u32>,
130 next_pool_id: u32,
131 schema_ids: FxHashMap<SchemaKey, WireTypeId>,
132}
133
134impl Default for Encoder<Vec<u8>> {
135 fn default() -> Self {
136 Self::new()
137 }
138}
139
140impl Encoder<Vec<u8>> {
141 pub fn new() -> Self {
142 let mut buf = Vec::new();
143 codec::encode_header(&mut buf).expect("Vec::write_all cannot fail");
144 Self {
145 state: EncodeState::new(buf),
146 registry: SchemaRegistry::new(),
147 string_pool: FxHashMap::default(),
148 next_pool_id: 0,
149 schema_ids: FxHashMap::default(),
150 }
151 }
152
153 pub fn finish(self) -> Vec<u8> {
155 self.state.writer.into_inner()
156 }
157}
158
159impl<W: Write> Encoder<W> {
160 pub fn new_to(mut writer: W) -> io::Result<Self> {
163 codec::encode_header(&mut writer)?;
164 Ok(Self {
165 state: EncodeState::new(writer),
166 registry: SchemaRegistry::new(),
167 string_pool: FxHashMap::default(),
168 next_pool_id: 0,
169 schema_ids: FxHashMap::default(),
170 })
171 }
172
173 pub(crate) fn from_decoder(
176 mut registry: SchemaRegistry,
177 string_pool: crate::decoder::StringPool,
178 timestamp_base_ns: u64,
179 writer: W,
180 ) -> Self {
181 let mut pool = FxHashMap::default();
182 let mut next_pool_id: u32 = 0;
183 for (id, value) in string_pool.0.into_iter() {
184 pool.insert(value, id.raw_id());
185 if id.raw_id() >= next_pool_id {
186 next_pool_id = id.raw_id() + 1;
187 }
188 }
189
190 let mut schema_ids = FxHashMap::default();
191 for (wire_id, entry) in registry.entries() {
192 schema_ids.insert(SchemaKey::Name(Arc::from(entry.name.as_str())), wire_id);
193 }
194 registry.sync_next_id();
195
196 let mut state = EncodeState::new(writer);
197 state.set_ts_base_unchecked(timestamp_base_ns);
198
199 Self {
200 state,
201 registry,
202 string_pool: pool,
203 next_pool_id,
204 schema_ids,
205 }
206 }
207
208 pub fn into_inner(self) -> W {
210 self.state.writer.into_inner()
211 }
212
213 pub fn as_inner(&self) -> &W {
215 self.state.writer.inner()
216 }
217
218 pub fn bytes_written(&self) -> u64 {
220 self.state.writer.bytes_written()
221 }
222
223 pub fn reset_to(&mut self, mut new_writer: W) -> io::Result<W> {
226 codec::encode_header(&mut new_writer)?;
227 self.string_pool.clear();
228 self.next_pool_id = 0;
229 self.registry.clear();
230 self.schema_ids.clear();
231 let old_state = std::mem::replace(&mut self.state, EncodeState::new(new_writer));
233 Ok(old_state.writer.into_inner())
234 }
235
236 fn ensure_registered(&mut self, schema: &Schema) -> io::Result<WireTypeId> {
242 let key = SchemaKey::Name(Arc::clone(&schema.name_key));
243 if let Some(&wire_id) = self.schema_ids.get(&key) {
244 let Some(existing) = self.registry.get(wire_id) else {
246 return Err(io::Error::other(format!(
247 "corrupted internal state. {wire_id:?} in schema_ids but not in registry."
248 )));
249 };
250 if *existing == *schema.entry {
251 return Ok(wire_id);
252 }
253 return Err(io::Error::new(
254 io::ErrorKind::InvalidInput,
255 format!(
256 "schema already registered with different definition: {}",
257 schema.name()
258 ),
259 ));
260 }
261 let id = self.registry.next_type_id();
262 codec::encode_schema(id, &schema.entry, &mut self.state.writer)?;
263 self.registry
264 .register(id, (*schema.entry).clone())
265 .expect("schema registration failed");
266 self.schema_ids.insert(key, id);
267 Ok(id)
268 }
269
270 pub fn register_schema(
280 &mut self,
281 name: &str,
282 fields: Vec<crate::schema::FieldDef>,
283 ) -> io::Result<Schema> {
284 let schema = Schema::new(name, fields);
285 self.ensure_registered(&schema)?;
286 Ok(schema)
287 }
288
289 pub fn write_event(
298 &mut self,
299 schema: &Schema,
300 values: &[crate::types::FieldValue],
301 ) -> io::Result<()> {
302 use crate::types::FieldValue;
303
304 let type_id = self.ensure_registered(schema)?;
305 let expected_fields = schema.entry.fields.len();
306
307 let ts_ns = match values.first() {
308 Some(FieldValue::Varint(ns)) => *ns,
309 _ => {
310 return Err(io::Error::new(
311 io::ErrorKind::InvalidInput,
312 "first value must be FieldValue::Varint(timestamp_ns)",
313 ));
314 }
315 };
316 let field_values = &values[1..];
317
318 if field_values.len() != expected_fields {
319 return Err(io::Error::new(
320 io::ErrorKind::InvalidInput,
321 format!(
322 "value count ({}) does not match schema field count ({}) for schema '{}'",
323 field_values.len(),
324 expected_fields,
325 schema.name(),
326 ),
327 ));
328 }
329
330 let ts_delta = self.state.encode_timestamp_delta(ts_ns)?;
331 self.state.writer.write_all(&[codec::TAG_EVENT])?;
332 self.state.writer.write_all(&type_id.0.to_le_bytes())?;
333 codec::encode_u24_le(ts_delta, &mut self.state.writer)?;
334 let mut enc = EventEncoder::new(&mut self.state);
335 for (i, v) in field_values.iter().enumerate() {
336 enc.write_field_value(v, schema.entry.fields[i].field_type)?;
337 }
338 Ok(())
339 }
340
341 pub fn write<T: TraceEvent + 'static>(&mut self, event: &T) -> io::Result<()> {
344 let key = SchemaKey::RustType(TypeId::of::<T>());
345 let tid = if let Some(&cached) = self.schema_ids.get(&key) {
346 cached
347 } else {
348 let entry = T::schema_entry();
349 let schema = Schema::new(&entry.name, entry.fields);
350 let id = self.ensure_registered(&schema)?;
351 self.schema_ids.insert(key, id);
352 id
353 };
354 let ts_ns = event.timestamp();
355 let ts_delta = self.state.encode_timestamp_delta(ts_ns)?;
356 self.state.writer.write_all(&[codec::TAG_EVENT])?;
357 self.state.writer.write_all(&tid.0.to_le_bytes())?;
358 codec::encode_u24_le(ts_delta, &mut self.state.writer)?;
359 let mut enc = EventEncoder::new(&mut self.state);
360 event.encode_fields(&mut enc)
361 }
362
363 pub fn intern_string(&mut self, s: &str) -> io::Result<InternedString> {
365 if let Some(&id) = self.string_pool.get(s) {
366 return Ok(InternedString(id));
367 }
368 let id = self.next_pool_id;
369 self.next_pool_id += 1;
370 self.string_pool.insert(s.to_string(), id);
371 codec::encode_string_pool(
372 &[PoolEntry {
373 pool_id: id,
374 data: s.as_bytes().to_vec(),
375 }],
376 &mut self.state.writer,
377 )?;
378 Ok(InternedString(id))
379 }
380
381 pub fn write_string_pool(&mut self, entries: &[PoolEntry]) -> io::Result<()> {
382 codec::encode_string_pool(entries, &mut self.state.writer)
383 }
384
385 pub fn flush(&mut self) -> io::Result<()> {
387 self.state.writer.flush()
388 }
389
390 pub fn into_raw_encoder(self) -> RawEncoder<W> {
397 RawEncoder {
398 writer: self.state.writer,
399 }
400 }
401}
402
403pub struct RawEncoder<W> {
410 writer: CountingWriter<W>,
411}
412
413impl<W: Write> RawEncoder<W> {
414 pub fn write_raw(&mut self, bytes: &[u8]) -> io::Result<()> {
416 self.writer.write_all(bytes)
417 }
418
419 pub fn bytes_written(&self) -> u64 {
422 self.writer.bytes_written()
423 }
424
425 pub fn flush(&mut self) -> io::Result<()> {
427 self.writer.flush()
428 }
429
430 pub fn into_inner(self) -> W {
432 self.writer.into_inner()
433 }
434}
435
436impl Encoder<Vec<u8>> {
437 pub fn write_infallible<T: TraceEvent + 'static>(&mut self, event: &T) {
438 self.write(event).expect("writing to Vec<u8> is infallible")
439 }
440
441 pub fn intern_string_infallible(&mut self, s: &str) -> InternedString {
442 self.intern_string(s)
443 .expect("interning into Vec<u8> is infallible")
444 }
445
446 pub fn reset_to_infallible(&mut self, data: Vec<u8>) -> Vec<u8> {
448 self.reset_to(data)
449 .expect("writing to Vec<u8> is infallible")
450 }
451}
452
453#[cfg(test)]
454mod tests {
455 use super::*;
456 use crate::schema::FieldDef;
457 use crate::types::{FieldType, FieldValue};
458
459 #[test]
460 fn encoder_writes_header() {
461 let enc = Encoder::new();
462 let data = enc.finish();
463 assert_eq!(&data[..5], &[0x54, 0x52, 0x43, 0x00, 1]);
464 }
465
466 #[test]
467 fn encoder_register_and_write_event() {
468 let mut enc = Encoder::new();
469 let schema = enc
470 .register_schema(
471 "Ev",
472 vec![FieldDef {
473 name: "v".into(),
474 field_type: FieldType::Varint,
475 }],
476 )
477 .unwrap();
478 enc.write_event(
479 &schema,
480 &[FieldValue::Varint(1_000), FieldValue::Varint(42)],
481 )
482 .unwrap();
483 let data = enc.finish();
484 assert!(data.len() > 5);
485 }
486
487 #[test]
488 fn idempotent_re_registration() {
489 let mut enc = Encoder::new();
490 let fields = vec![FieldDef {
491 name: "v".into(),
492 field_type: FieldType::Varint,
493 }];
494 let _s1 = enc.register_schema("Ev", fields.clone()).unwrap();
495 let _s2 = enc.register_schema("Ev", fields).unwrap();
496 }
498
499 #[test]
500 fn re_registration_different_schema_errors() {
501 let mut enc = Encoder::new();
502 enc.register_schema(
503 "Ev",
504 vec![FieldDef {
505 name: "v".into(),
506 field_type: FieldType::Varint,
507 }],
508 )
509 .unwrap();
510 let result = enc.register_schema(
511 "Ev",
512 vec![FieldDef {
513 name: "different".into(),
514 field_type: FieldType::Bool,
515 }],
516 );
517 assert!(result.is_err());
518 }
519
520 #[test]
521 fn schema_auto_registers_on_write() {
522 use crate::decoder::{DecodedFrame, Decoder};
523
524 let schema = Schema::new(
526 "Lazy",
527 vec![FieldDef {
528 name: "v".into(),
529 field_type: FieldType::Varint,
530 }],
531 );
532
533 let mut enc = Encoder::new();
535 enc.write_event(
536 &schema,
537 &[FieldValue::Varint(1_000), FieldValue::Varint(42)],
538 )
539 .unwrap();
540
541 let bytes = enc.finish();
542 let mut dec = Decoder::new(&bytes).unwrap();
543 let frames = dec.decode_all();
544 assert!(matches!(&frames[0], DecodedFrame::Schema(s) if s.name == "Lazy"));
545 if let DecodedFrame::Event { values, .. } = &frames[1] {
546 assert_eq!(*values, vec![FieldValue::Varint(42)]);
547 } else {
548 panic!("expected event");
549 }
550 }
551
552 #[test]
553 fn schema_portable_across_encoders() {
554 use crate::decoder::{DecodedFrame, Decoder};
555
556 let mut enc1 = Encoder::new();
557 let schema = enc1
558 .register_schema(
559 "Shared",
560 vec![FieldDef {
561 name: "v".into(),
562 field_type: FieldType::Varint,
563 }],
564 )
565 .unwrap();
566 enc1.write_event(&schema, &[FieldValue::Varint(1_000), FieldValue::Varint(1)])
567 .unwrap();
568
569 let mut enc2 = Encoder::new();
571 enc2.write_event(&schema, &[FieldValue::Varint(2_000), FieldValue::Varint(2)])
572 .unwrap();
573
574 for (enc, expected_val) in [(enc1, 1u64), (enc2, 2u64)] {
576 let bytes = enc.finish();
577 let mut dec = Decoder::new(&bytes).unwrap();
578 let frames = dec.decode_all();
579 let event = frames
580 .iter()
581 .find(|f| matches!(f, DecodedFrame::Event { .. }))
582 .unwrap();
583 if let DecodedFrame::Event { values, .. } = event {
584 assert_eq!(values[0], FieldValue::Varint(expected_val));
585 }
586 }
587 }
588
589 #[test]
590 fn encoder_intern_string_deduplicates() {
591 let mut enc = Encoder::new();
592 let id1 = enc.intern_string("hello").unwrap();
593 let id2 = enc.intern_string("hello").unwrap();
594 let id3 = enc.intern_string("world").unwrap();
595 assert_eq!(id1, id2);
596 assert_ne!(id1, id3);
597 }
598
599 #[test]
600 fn timestamp_round_trip() {
601 use crate::decoder::{DecodedFrame, Decoder};
602
603 let mut enc = Encoder::new();
604 let schema = enc
605 .register_schema(
606 "TS",
607 vec![FieldDef {
608 name: "v".into(),
609 field_type: FieldType::Varint,
610 }],
611 )
612 .unwrap();
613
614 let ts1 = 100_000u64;
615 let ts2 = 50_000u64;
616 let ts3 = 200_000_000u64;
617 let ts4 = 100_000_000u64;
618 enc.write_event(&schema, &[FieldValue::Varint(ts1), FieldValue::Varint(1)])
619 .unwrap();
620 enc.write_event(&schema, &[FieldValue::Varint(ts2), FieldValue::Varint(2)])
621 .unwrap();
622 enc.write_event(&schema, &[FieldValue::Varint(ts3), FieldValue::Varint(3)])
623 .unwrap();
624 enc.write_event(&schema, &[FieldValue::Varint(ts4), FieldValue::Varint(4)])
625 .unwrap();
626
627 let bytes = enc.finish();
628 let mut dec = Decoder::new(&bytes).unwrap();
629 let events: Vec<_> = dec
630 .decode_all()
631 .into_iter()
632 .filter_map(|f| match f {
633 DecodedFrame::Event {
634 timestamp_ns,
635 values,
636 ..
637 } => Some((timestamp_ns, values)),
638 _ => None,
639 })
640 .collect();
641
642 assert_eq!(events.len(), 4);
643 assert_eq!(events[0].0, Some(ts1));
644 assert_eq!(events[0].1, vec![FieldValue::Varint(1)]);
645 assert_eq!(events[1].0, Some(ts2));
646 assert_eq!(events[1].1, vec![FieldValue::Varint(2)]);
647 assert_eq!(events[2].0, Some(ts3));
648 assert_eq!(events[2].1, vec![FieldValue::Varint(3)]);
649 assert_eq!(events[3].0, Some(ts4));
650 assert_eq!(events[3].1, vec![FieldValue::Varint(4)]);
651 }
652
653 #[test]
654 fn encoder_new_to_writer() {
655 let mut buf = Vec::new();
656 let enc = Encoder::new_to(&mut buf).unwrap();
657 drop(enc);
658 assert!(buf.len() >= 5);
659 assert_eq!(&buf[..5], &[0x54, 0x52, 0x43, 0x00, 1]);
660 }
661
662 #[test]
663 fn decoder_into_encoder_appends_without_header() {
664 use crate::decoder::{DecodedFrame, Decoder};
665
666 let mut enc = Encoder::new();
668 let schema = enc
669 .register_schema(
670 "Ev",
671 vec![FieldDef {
672 name: "v".into(),
673 field_type: FieldType::Varint,
674 }],
675 )
676 .unwrap();
677 enc.write_event(&schema, &[FieldValue::Varint(1_000), FieldValue::Varint(1)])
678 .unwrap();
679 let base = enc.finish();
680
681 let mut decoder = Decoder::new(&base).unwrap();
683 while decoder.next_frame_ref().ok().flatten().is_some() {}
684 let mut output = Vec::new();
685 let mut ext = decoder.into_encoder(&mut output);
686 ext.write_event(&schema, &[FieldValue::Varint(2_000), FieldValue::Varint(2)])
688 .unwrap();
689 drop(ext);
690
691 let mut combined = base.clone();
693 combined.extend_from_slice(&output);
694 let mut dec = Decoder::new(&combined).unwrap();
695 let events: Vec<_> = dec
696 .decode_all()
697 .into_iter()
698 .filter_map(|f| match f {
699 DecodedFrame::Event {
700 timestamp_ns,
701 values,
702 ..
703 } => Some((timestamp_ns, values)),
704 _ => None,
705 })
706 .collect();
707 assert_eq!(events.len(), 2);
708 assert_eq!(events[0].0, Some(1_000));
709 assert_eq!(events[1].0, Some(2_000));
710 }
711
712 #[test]
713 fn decoder_into_encoder_deduplicates_interned_strings() {
714 use crate::decoder::{DecodedFrame, Decoder};
715
716 let mut enc = Encoder::new();
718 let id1 = enc.intern_string("hello").unwrap();
719 let base = enc.finish();
720
721 let mut decoder = Decoder::new(&base).unwrap();
723 while decoder.next_frame_ref().ok().flatten().is_some() {}
724 let mut output = Vec::new();
725 let mut ext = decoder.into_encoder(&mut output);
726 let id2 = ext.intern_string("hello").unwrap();
728 let id3 = ext.intern_string("world").unwrap();
729 drop(ext);
730
731 assert_eq!(id1, id2, "existing string should reuse pool ID");
732 assert_ne!(id2, id3);
733
734 let mut combined = base.clone();
736 combined.extend_from_slice(&output);
737 let mut dec = Decoder::new(&combined).unwrap();
738 let frames = dec.decode_all();
739 let pool_frames: Vec<_> = frames
740 .iter()
741 .filter(|f| matches!(f, DecodedFrame::StringPool(_)))
742 .collect();
743 assert_eq!(pool_frames.len(), 2);
745 }
746
747 #[test]
748 fn register_and_write() {
749 use crate::decoder::{DecodedFrame, Decoder};
750
751 let mut enc = Encoder::new();
752 let schema = enc
753 .register_schema(
754 "MyEvent",
755 vec![
756 FieldDef {
757 name: "count".into(),
758 field_type: FieldType::Varint,
759 },
760 FieldDef {
761 name: "name".into(),
762 field_type: FieldType::String,
763 },
764 ],
765 )
766 .unwrap();
767
768 enc.write_event(
769 &schema,
770 &[
771 FieldValue::Varint(1_000_000),
772 FieldValue::Varint(42),
773 FieldValue::String("hello".into()),
774 ],
775 )
776 .unwrap();
777
778 let bytes = enc.finish();
779 let mut dec = Decoder::new(&bytes).unwrap();
780 let frames = dec.decode_all();
781 let events: Vec<_> = frames
782 .into_iter()
783 .filter_map(|f| match f {
784 DecodedFrame::Event {
785 timestamp_ns,
786 values,
787 ..
788 } => Some((timestamp_ns, values)),
789 _ => None,
790 })
791 .collect();
792 assert_eq!(events.len(), 1);
793 assert_eq!(events[0].0, Some(1_000_000));
794 assert_eq!(events[0].1[0], FieldValue::Varint(42));
795 assert_eq!(events[0].1[1], FieldValue::String("hello".into()));
796 }
797
798 #[test]
799 fn register_conflict_errors() {
800 let mut enc = Encoder::new();
801 enc.register_schema(
802 "Ev",
803 vec![FieldDef {
804 name: "v".into(),
805 field_type: FieldType::Varint,
806 }],
807 )
808 .unwrap();
809 let result = enc.register_schema(
810 "Ev",
811 vec![FieldDef {
812 name: "other".into(),
813 field_type: FieldType::Bool,
814 }],
815 );
816 assert!(result.is_err());
817 }
818
819 #[test]
820 fn write_wrong_field_count_errors() {
821 let mut enc = Encoder::new();
822 let schema = enc
823 .register_schema(
824 "Ev",
825 vec![FieldDef {
826 name: "v".into(),
827 field_type: FieldType::Varint,
828 }],
829 )
830 .unwrap();
831 let result = enc.write_event(
833 &schema,
834 &[
835 FieldValue::Varint(0),
836 FieldValue::Varint(1),
837 FieldValue::Varint(2),
838 ],
839 );
840 assert!(result.is_err());
841 }
842
843 #[test]
846 fn timestamp_base_advances_per_event() {
847 use crate::decoder::{DecodedFrame, Decoder};
848
849 let mut enc = Encoder::new();
850 let schema = enc
851 .register_schema(
852 "Ev",
853 vec![FieldDef {
854 name: "v".into(),
855 field_type: FieldType::Varint,
856 }],
857 )
858 .unwrap();
859
860 let ts1 = 12_000_000u64;
861 let ts2 = 24_000_000u64;
862 enc.write_event(&schema, &[FieldValue::Varint(ts1), FieldValue::Varint(1)])
863 .unwrap();
864 enc.write_event(&schema, &[FieldValue::Varint(ts2), FieldValue::Varint(2)])
865 .unwrap();
866
867 let bytes = enc.finish();
868
869 let reset_count = bytes.iter().filter(|&&b| b == 0x05).count();
870 assert_eq!(
871 reset_count, 0,
872 "base should advance per event, avoiding unnecessary resets"
873 );
874
875 let mut dec = Decoder::new(&bytes).unwrap();
876 let events: Vec<_> = dec
877 .decode_all()
878 .into_iter()
879 .filter_map(|f| match f {
880 DecodedFrame::Event { timestamp_ns, .. } => timestamp_ns,
881 _ => None,
882 })
883 .collect();
884 assert_eq!(events, vec![ts1, ts2]);
885 }
886
887 #[test]
888 fn reset_to_preserves_capacity() {
889 let mut enc = Encoder::new();
890 for i in 0..100 {
891 enc.intern_string(&format!("string_{}", i)).unwrap();
892 }
893 let cap_before = enc.string_pool.capacity();
894 let _bytes = enc.reset_to(Vec::new());
895 let cap_after = enc.string_pool.capacity();
896 assert_eq!(
897 cap_before, cap_after,
898 "string_pool capacity should be preserved after reset_to"
899 );
900 }
901
902 #[test]
903 fn reset_to_returns_old_data_and_clears_state() {
904 use crate::decoder::{DecodedFrame, Decoder};
905
906 let mut enc = Encoder::new();
907 let schema = enc
908 .register_schema(
909 "Ev",
910 vec![FieldDef {
911 name: "v".into(),
912 field_type: FieldType::Varint,
913 }],
914 )
915 .unwrap();
916 enc.write_event(
917 &schema,
918 &[FieldValue::Varint(1_000), FieldValue::Varint(42)],
919 )
920 .unwrap();
921 let _s = enc.intern_string("hello").unwrap();
922
923 let old_bytes_written = enc.bytes_written();
924 assert!(old_bytes_written > 0);
925
926 let old = enc.reset_to_infallible(Vec::new());
928
929 let mut dec = Decoder::new(&old).unwrap();
931 let frames = dec.decode_all();
932 assert!(frames.iter().any(|f| matches!(f, DecodedFrame::Schema(_))));
933 assert!(
934 frames
935 .iter()
936 .any(|f| matches!(f, DecodedFrame::Event { .. }))
937 );
938 assert!(
939 frames
940 .iter()
941 .any(|f| matches!(f, DecodedFrame::StringPool(_)))
942 );
943
944 assert!(
946 enc.bytes_written() < old_bytes_written,
947 "bytes_written should reset (got {} vs old {})",
948 enc.bytes_written(),
949 old_bytes_written
950 );
951
952 enc.write_event(
955 &schema,
956 &[FieldValue::Varint(2_000), FieldValue::Varint(99)],
957 )
958 .unwrap();
959
960 let _s2 = enc.intern_string("hello").unwrap();
962
963 let new_bytes = enc.reset_to_infallible(Vec::new());
965 let mut dec2 = Decoder::new(&new_bytes).unwrap();
966 let new_frames = dec2.decode_all();
967 assert!(
969 new_frames
970 .iter()
971 .any(|f| matches!(f, DecodedFrame::Schema(s) if s.name == "Ev")),
972 "new trace must contain schema definition"
973 );
974 assert!(
976 new_frames
977 .iter()
978 .any(|f| matches!(f, DecodedFrame::StringPool(_))),
979 "new trace must contain string pool"
980 );
981 let event = new_frames
983 .iter()
984 .find_map(|f| match f {
985 DecodedFrame::Event {
986 timestamp_ns,
987 values,
988 ..
989 } => Some((timestamp_ns, values)),
990 _ => None,
991 })
992 .expect("new trace must contain event");
993 assert_eq!(*event.0, Some(2_000));
994 assert_eq!(event.1[0], FieldValue::Varint(99));
995 }
996
997 #[test]
998 fn into_raw_encoder_preserves_byte_count() {
999 let mut enc = Encoder::new();
1000 let schema = enc
1001 .register_schema(
1002 "Ev",
1003 vec![FieldDef {
1004 name: "v".into(),
1005 field_type: FieldType::Varint,
1006 }],
1007 )
1008 .unwrap();
1009 enc.write_event(
1010 &schema,
1011 &[FieldValue::Varint(1_000), FieldValue::Varint(42)],
1012 )
1013 .unwrap();
1014
1015 let bytes_before = enc.bytes_written();
1016 assert!(bytes_before > 0);
1017
1018 let raw = enc.into_raw_encoder();
1019 assert_eq!(
1020 raw.bytes_written(),
1021 bytes_before,
1022 "byte count must be preserved across conversion"
1023 );
1024 }
1025
1026 #[test]
1027 fn raw_encoder_write_raw_and_bytes_written() {
1028 let enc = Encoder::new();
1029 let initial = enc.bytes_written();
1030 let mut raw = enc.into_raw_encoder();
1031
1032 let payload = [0xAA; 100];
1033 raw.write_raw(&payload).unwrap();
1034
1035 assert_eq!(
1036 raw.bytes_written(),
1037 initial + payload.len() as u64,
1038 "bytes_written must include raw payload"
1039 );
1040 }
1041
1042 #[test]
1043 fn raw_encoder_into_inner_returns_all_data() {
1044 use crate::decoder::{DecodedFrame, Decoder};
1045
1046 let mut enc = Encoder::new();
1049 let schema = enc
1050 .register_schema(
1051 "Ev",
1052 vec![FieldDef {
1053 name: "v".into(),
1054 field_type: FieldType::Varint,
1055 }],
1056 )
1057 .unwrap();
1058 enc.write_event(&schema, &[FieldValue::Varint(1_000), FieldValue::Varint(1)])
1059 .unwrap();
1060
1061 let raw_batch = {
1063 let mut batch_enc = Encoder::new();
1064 batch_enc
1065 .write_event(&schema, &[FieldValue::Varint(2_000), FieldValue::Varint(2)])
1066 .unwrap();
1067 batch_enc.finish()
1068 };
1069
1070 let mut raw = enc.into_raw_encoder();
1071 raw.write_raw(&raw_batch).unwrap();
1072 let combined = raw.into_inner();
1073
1074 let mut dec = Decoder::new(&combined).unwrap();
1075 let events: Vec<_> = dec
1076 .decode_all()
1077 .into_iter()
1078 .filter_map(|f| match f {
1079 DecodedFrame::Event {
1080 timestamp_ns,
1081 values,
1082 ..
1083 } => Some((timestamp_ns, values)),
1084 _ => None,
1085 })
1086 .collect();
1087
1088 assert_eq!(events.len(), 2);
1089 assert_eq!(events[0].0, Some(1_000));
1090 assert_eq!(events[0].1, vec![FieldValue::Varint(1)]);
1091 assert_eq!(events[1].0, Some(2_000));
1092 assert_eq!(events[1].1, vec![FieldValue::Varint(2)]);
1093 }
1094}