1use crate::TraceEvent;
8use crate::codec::{self, PoolEntry, WireTypeId};
9use crate::schema::{SchemaEntry, SchemaRegistry};
10use crate::types::{CountingWriter, EncodeState, EventEncoder, InternedString};
11use std::any::TypeId;
12use std::collections::HashMap;
13use std::hash::{BuildHasherDefault, Hasher};
14use std::io::{self, Write};
15use std::sync::Arc;
16
17#[doc(hidden)]
22#[derive(Default)]
23pub struct FxHasher(u64);
24
25impl Hasher for FxHasher {
26 #[inline]
27 fn write(&mut self, bytes: &[u8]) {
28 for &b in bytes {
29 self.0 = (self.0.rotate_left(5) ^ b as u64).wrapping_mul(0x517cc1b727220a95);
30 }
31 }
32
33 #[inline]
34 fn write_u64(&mut self, i: u64) {
35 self.0 = (self.0.rotate_left(5) ^ i).wrapping_mul(0x517cc1b727220a95);
36 }
37
38 #[inline]
39 fn write_u32(&mut self, i: u32) {
40 self.write_u64(i as u64)
41 }
42
43 #[inline]
44 fn write_u16(&mut self, i: u16) {
45 self.write_u64(i as u64)
46 }
47
48 #[inline]
49 fn write_usize(&mut self, i: usize) {
50 self.write_u64(i as u64)
51 }
52
53 #[inline]
54 fn write_u128(&mut self, i: u128) {
55 self.write_u64(i as u64);
56 self.write_u64((i >> 64) as u64);
57 }
58
59 #[inline]
60 fn finish(&self) -> u64 {
61 self.0
62 }
63}
64
65#[doc(hidden)]
66pub type FxBuildHasher = BuildHasherDefault<FxHasher>;
67#[doc(hidden)]
68pub type FxHashMap<K, V> = HashMap<K, V, FxBuildHasher>;
69
70#[derive(Clone, Debug)]
79pub struct Schema {
80 pub(crate) entry: Arc<SchemaEntry>,
81 name_key: Arc<str>,
84}
85
86impl Schema {
87 pub fn new(name: &str, fields: Vec<crate::schema::FieldDef>) -> Self {
92 let name_key: Arc<str> = Arc::from(name);
93 Self {
94 entry: Arc::new(SchemaEntry {
95 name: name.to_string(),
96 has_timestamp: true,
97 fields,
98 }),
99 name_key,
100 }
101 }
102
103 pub fn name(&self) -> &str {
105 &self.entry.name
106 }
107
108 pub fn fields(&self) -> &[crate::schema::FieldDef] {
110 &self.entry.fields
111 }
112}
113
114#[derive(Clone, PartialEq, Eq, Hash)]
117enum SchemaKey {
118 Name(Arc<str>),
119 RustType(TypeId),
120}
121
122pub struct Encoder<W: Write = Vec<u8>> {
130 state: EncodeState<W>,
131 registry: SchemaRegistry,
132 string_pool: FxHashMap<String, u32>,
133 next_pool_id: u32,
134 schema_ids: FxHashMap<SchemaKey, WireTypeId>,
135}
136
137impl Default for Encoder<Vec<u8>> {
138 fn default() -> Self {
139 Self::new()
140 }
141}
142
143impl Encoder<Vec<u8>> {
144 pub fn new() -> Self {
145 let mut buf = Vec::new();
146 codec::encode_header(&mut buf).expect("Vec::write_all cannot fail");
147 Self {
148 state: EncodeState::new(buf),
149 registry: SchemaRegistry::new(),
150 string_pool: FxHashMap::default(),
151 next_pool_id: 0,
152 schema_ids: FxHashMap::default(),
153 }
154 }
155
156 pub fn finish(self) -> Vec<u8> {
158 self.state.writer.into_inner()
159 }
160}
161
162impl<W: Write> Encoder<W> {
163 pub fn new_to(mut writer: W) -> io::Result<Self> {
166 codec::encode_header(&mut writer)?;
167 Ok(Self {
168 state: EncodeState::new(writer),
169 registry: SchemaRegistry::new(),
170 string_pool: FxHashMap::default(),
171 next_pool_id: 0,
172 schema_ids: FxHashMap::default(),
173 })
174 }
175
176 pub(crate) fn from_decoder(
179 mut registry: SchemaRegistry,
180 string_pool: crate::decoder::StringPool,
181 timestamp_base_ns: u64,
182 writer: W,
183 ) -> Self {
184 let mut pool = FxHashMap::default();
185 let mut next_pool_id: u32 = 0;
186 for (id, value) in string_pool.0.into_iter() {
187 pool.insert(value, id.raw_id());
188 if id.raw_id() >= next_pool_id {
189 next_pool_id = id.raw_id() + 1;
190 }
191 }
192
193 let mut schema_ids = FxHashMap::default();
194 for (wire_id, entry) in registry.entries() {
195 schema_ids.insert(SchemaKey::Name(Arc::from(entry.name.as_str())), wire_id);
196 }
197 registry.sync_next_id();
198
199 let mut state = EncodeState::new(writer);
200 state.set_ts_base_unchecked(timestamp_base_ns);
201
202 Self {
203 state,
204 registry,
205 string_pool: pool,
206 next_pool_id,
207 schema_ids,
208 }
209 }
210
211 pub fn into_inner(self) -> W {
213 self.state.writer.into_inner()
214 }
215
216 pub fn as_inner(&self) -> &W {
218 self.state.writer.inner()
219 }
220
221 pub fn bytes_written(&self) -> u64 {
223 self.state.writer.bytes_written()
224 }
225
226 pub fn reset_to(&mut self, mut new_writer: W) -> io::Result<W> {
229 codec::encode_header(&mut new_writer)?;
230 self.string_pool.clear();
231 self.next_pool_id = 0;
232 self.registry.clear();
233 self.schema_ids.clear();
234 let old_state = std::mem::replace(&mut self.state, EncodeState::new(new_writer));
236 Ok(old_state.writer.into_inner())
237 }
238
239 fn ensure_registered(&mut self, schema: &Schema) -> io::Result<WireTypeId> {
245 let key = SchemaKey::Name(Arc::clone(&schema.name_key));
246 if let Some(&wire_id) = self.schema_ids.get(&key) {
247 let Some(existing) = self.registry.get(wire_id) else {
249 return Err(io::Error::other(format!(
250 "corrupted internal state. {wire_id:?} in schema_ids but not in registry."
251 )));
252 };
253 if *existing == *schema.entry {
254 return Ok(wire_id);
255 }
256 return Err(io::Error::new(
257 io::ErrorKind::InvalidInput,
258 format!(
259 "schema already registered with different definition: {}",
260 schema.name()
261 ),
262 ));
263 }
264 let id = self.registry.next_type_id();
265 codec::encode_schema(id, &schema.entry, &mut self.state.writer)?;
266 self.registry
267 .register(id, (*schema.entry).clone())
268 .expect("schema registration failed");
269 self.schema_ids.insert(key, id);
270 Ok(id)
271 }
272
273 pub fn register_schema(
283 &mut self,
284 name: &str,
285 fields: Vec<crate::schema::FieldDef>,
286 ) -> io::Result<Schema> {
287 let schema = Schema::new(name, fields);
288 self.ensure_registered(&schema)?;
289 Ok(schema)
290 }
291
292 pub fn write_event(
301 &mut self,
302 schema: &Schema,
303 values: &[crate::types::FieldValue],
304 ) -> io::Result<()> {
305 use crate::types::FieldValue;
306
307 let type_id = self.ensure_registered(schema)?;
308 let expected_fields = schema.entry.fields.len();
309
310 let ts_ns = match values.first() {
311 Some(FieldValue::Varint(ns)) => *ns,
312 _ => {
313 return Err(io::Error::new(
314 io::ErrorKind::InvalidInput,
315 "first value must be FieldValue::Varint(timestamp_ns)",
316 ));
317 }
318 };
319 let field_values = &values[1..];
320
321 if field_values.len() != expected_fields {
322 return Err(io::Error::new(
323 io::ErrorKind::InvalidInput,
324 format!(
325 "value count ({}) does not match schema field count ({}) for schema '{}'",
326 field_values.len(),
327 expected_fields,
328 schema.name(),
329 ),
330 ));
331 }
332
333 let ts_delta = self.state.encode_timestamp_delta(ts_ns)?;
334 self.state.writer.write_all(&[codec::TAG_EVENT])?;
335 self.state.writer.write_all(&type_id.0.to_le_bytes())?;
336 codec::encode_u24_le(ts_delta, &mut self.state.writer)?;
337 let mut enc = EventEncoder::new(&mut self.state);
338 for (i, v) in field_values.iter().enumerate() {
339 enc.write_field_value(v, schema.entry.fields[i].field_type)?;
340 }
341 Ok(())
342 }
343
344 pub fn write<T: TraceEvent + 'static>(&mut self, event: &T) -> io::Result<()> {
347 let key = SchemaKey::RustType(TypeId::of::<T>());
348 let tid = if let Some(&cached) = self.schema_ids.get(&key) {
349 cached
350 } else {
351 let entry = T::schema_entry();
352 let schema = Schema::new(&entry.name, entry.fields);
353 let id = self.ensure_registered(&schema)?;
354 self.schema_ids.insert(key, id);
355 id
356 };
357 let ts_ns = event.timestamp();
358 let ts_delta = self.state.encode_timestamp_delta(ts_ns)?;
359 self.state.writer.write_all(&[codec::TAG_EVENT])?;
360 self.state.writer.write_all(&tid.0.to_le_bytes())?;
361 codec::encode_u24_le(ts_delta, &mut self.state.writer)?;
362 let mut enc = EventEncoder::new(&mut self.state);
363 event.encode_fields(&mut enc)
364 }
365
366 pub fn intern_string(&mut self, s: &str) -> io::Result<InternedString> {
368 if let Some(&id) = self.string_pool.get(s) {
369 return Ok(InternedString(id));
370 }
371 let id = self.next_pool_id;
372 self.next_pool_id += 1;
373 self.string_pool.insert(s.to_string(), id);
374 codec::encode_string_pool(
375 &[PoolEntry {
376 pool_id: id,
377 data: s.as_bytes().to_vec(),
378 }],
379 &mut self.state.writer,
380 )?;
381 Ok(InternedString(id))
382 }
383
384 pub fn write_string_pool(&mut self, entries: &[PoolEntry]) -> io::Result<()> {
385 codec::encode_string_pool(entries, &mut self.state.writer)
386 }
387
388 pub fn flush(&mut self) -> io::Result<()> {
390 self.state.writer.flush()
391 }
392
393 pub fn into_raw_encoder(self) -> RawEncoder<W> {
400 RawEncoder {
401 writer: self.state.writer,
402 }
403 }
404}
405
406pub struct RawEncoder<W> {
413 writer: CountingWriter<W>,
414}
415
416impl<W: Write> RawEncoder<W> {
417 pub fn write_raw(&mut self, bytes: &[u8]) -> io::Result<()> {
419 self.writer.write_all(bytes)
420 }
421
422 pub fn bytes_written(&self) -> u64 {
425 self.writer.bytes_written()
426 }
427
428 pub fn flush(&mut self) -> io::Result<()> {
430 self.writer.flush()
431 }
432
433 pub fn into_inner(self) -> W {
435 self.writer.into_inner()
436 }
437}
438
439impl Encoder<Vec<u8>> {
440 pub fn write_infallible<T: TraceEvent + 'static>(&mut self, event: &T) {
441 self.write(event).expect("writing to Vec<u8> is infallible")
442 }
443
444 pub fn intern_string_infallible(&mut self, s: &str) -> InternedString {
445 self.intern_string(s)
446 .expect("interning into Vec<u8> is infallible")
447 }
448
449 pub fn reset_to_infallible(&mut self, data: Vec<u8>) -> Vec<u8> {
451 self.reset_to(data)
452 .expect("writing to Vec<u8> is infallible")
453 }
454}
455
456#[cfg(test)]
457mod tests {
458 use super::*;
459 use crate::schema::FieldDef;
460 use crate::types::{FieldType, FieldValue};
461
462 #[test]
463 fn encoder_writes_header() {
464 let enc = Encoder::new();
465 let data = enc.finish();
466 assert_eq!(&data[..5], &[0x54, 0x52, 0x43, 0x00, 1]);
467 }
468
469 #[test]
470 fn encoder_register_and_write_event() {
471 let mut enc = Encoder::new();
472 let schema = enc
473 .register_schema(
474 "Ev",
475 vec![FieldDef {
476 name: "v".into(),
477 field_type: FieldType::Varint,
478 }],
479 )
480 .unwrap();
481 enc.write_event(
482 &schema,
483 &[FieldValue::Varint(1_000), FieldValue::Varint(42)],
484 )
485 .unwrap();
486 let data = enc.finish();
487 assert!(data.len() > 5);
488 }
489
490 #[test]
491 fn idempotent_re_registration() {
492 let mut enc = Encoder::new();
493 let fields = vec![FieldDef {
494 name: "v".into(),
495 field_type: FieldType::Varint,
496 }];
497 let _s1 = enc.register_schema("Ev", fields.clone()).unwrap();
498 let _s2 = enc.register_schema("Ev", fields).unwrap();
499 }
501
502 #[test]
503 fn re_registration_different_schema_errors() {
504 let mut enc = Encoder::new();
505 enc.register_schema(
506 "Ev",
507 vec![FieldDef {
508 name: "v".into(),
509 field_type: FieldType::Varint,
510 }],
511 )
512 .unwrap();
513 let result = enc.register_schema(
514 "Ev",
515 vec![FieldDef {
516 name: "different".into(),
517 field_type: FieldType::Bool,
518 }],
519 );
520 assert!(result.is_err());
521 }
522
523 #[test]
524 fn schema_auto_registers_on_write() {
525 use crate::decoder::{DecodedFrame, Decoder};
526
527 let schema = Schema::new(
529 "Lazy",
530 vec![FieldDef {
531 name: "v".into(),
532 field_type: FieldType::Varint,
533 }],
534 );
535
536 let mut enc = Encoder::new();
538 enc.write_event(
539 &schema,
540 &[FieldValue::Varint(1_000), FieldValue::Varint(42)],
541 )
542 .unwrap();
543
544 let bytes = enc.finish();
545 let mut dec = Decoder::new(&bytes).unwrap();
546 let frames = dec.decode_all();
547 assert!(matches!(&frames[0], DecodedFrame::Schema(s) if s.name == "Lazy"));
548 if let DecodedFrame::Event { values, .. } = &frames[1] {
549 assert_eq!(*values, vec![FieldValue::Varint(42)]);
550 } else {
551 panic!("expected event");
552 }
553 }
554
555 #[test]
556 fn schema_portable_across_encoders() {
557 use crate::decoder::{DecodedFrame, Decoder};
558
559 let mut enc1 = Encoder::new();
560 let schema = enc1
561 .register_schema(
562 "Shared",
563 vec![FieldDef {
564 name: "v".into(),
565 field_type: FieldType::Varint,
566 }],
567 )
568 .unwrap();
569 enc1.write_event(&schema, &[FieldValue::Varint(1_000), FieldValue::Varint(1)])
570 .unwrap();
571
572 let mut enc2 = Encoder::new();
574 enc2.write_event(&schema, &[FieldValue::Varint(2_000), FieldValue::Varint(2)])
575 .unwrap();
576
577 for (enc, expected_val) in [(enc1, 1u64), (enc2, 2u64)] {
579 let bytes = enc.finish();
580 let mut dec = Decoder::new(&bytes).unwrap();
581 let frames = dec.decode_all();
582 let event = frames
583 .iter()
584 .find(|f| matches!(f, DecodedFrame::Event { .. }))
585 .unwrap();
586 if let DecodedFrame::Event { values, .. } = event {
587 assert_eq!(values[0], FieldValue::Varint(expected_val));
588 }
589 }
590 }
591
592 #[test]
593 fn encoder_intern_string_deduplicates() {
594 let mut enc = Encoder::new();
595 let id1 = enc.intern_string("hello").unwrap();
596 let id2 = enc.intern_string("hello").unwrap();
597 let id3 = enc.intern_string("world").unwrap();
598 assert_eq!(id1, id2);
599 assert_ne!(id1, id3);
600 }
601
602 #[test]
603 fn timestamp_round_trip() {
604 use crate::decoder::{DecodedFrame, Decoder};
605
606 let mut enc = Encoder::new();
607 let schema = enc
608 .register_schema(
609 "TS",
610 vec![FieldDef {
611 name: "v".into(),
612 field_type: FieldType::Varint,
613 }],
614 )
615 .unwrap();
616
617 let ts1 = 100_000u64;
618 let ts2 = 50_000u64;
619 let ts3 = 200_000_000u64;
620 let ts4 = 100_000_000u64;
621 enc.write_event(&schema, &[FieldValue::Varint(ts1), FieldValue::Varint(1)])
622 .unwrap();
623 enc.write_event(&schema, &[FieldValue::Varint(ts2), FieldValue::Varint(2)])
624 .unwrap();
625 enc.write_event(&schema, &[FieldValue::Varint(ts3), FieldValue::Varint(3)])
626 .unwrap();
627 enc.write_event(&schema, &[FieldValue::Varint(ts4), FieldValue::Varint(4)])
628 .unwrap();
629
630 let bytes = enc.finish();
631 let mut dec = Decoder::new(&bytes).unwrap();
632 let events: Vec<_> = dec
633 .decode_all()
634 .into_iter()
635 .filter_map(|f| match f {
636 DecodedFrame::Event {
637 timestamp_ns,
638 values,
639 ..
640 } => Some((timestamp_ns, values)),
641 _ => None,
642 })
643 .collect();
644
645 assert_eq!(events.len(), 4);
646 assert_eq!(events[0].0, Some(ts1));
647 assert_eq!(events[0].1, vec![FieldValue::Varint(1)]);
648 assert_eq!(events[1].0, Some(ts2));
649 assert_eq!(events[1].1, vec![FieldValue::Varint(2)]);
650 assert_eq!(events[2].0, Some(ts3));
651 assert_eq!(events[2].1, vec![FieldValue::Varint(3)]);
652 assert_eq!(events[3].0, Some(ts4));
653 assert_eq!(events[3].1, vec![FieldValue::Varint(4)]);
654 }
655
656 #[test]
657 fn encoder_new_to_writer() {
658 let mut buf = Vec::new();
659 let enc = Encoder::new_to(&mut buf).unwrap();
660 drop(enc);
661 assert!(buf.len() >= 5);
662 assert_eq!(&buf[..5], &[0x54, 0x52, 0x43, 0x00, 1]);
663 }
664
665 #[test]
666 fn decoder_into_encoder_appends_without_header() {
667 use crate::decoder::{DecodedFrame, Decoder};
668
669 let mut enc = Encoder::new();
671 let schema = enc
672 .register_schema(
673 "Ev",
674 vec![FieldDef {
675 name: "v".into(),
676 field_type: FieldType::Varint,
677 }],
678 )
679 .unwrap();
680 enc.write_event(&schema, &[FieldValue::Varint(1_000), FieldValue::Varint(1)])
681 .unwrap();
682 let base = enc.finish();
683
684 let mut decoder = Decoder::new(&base).unwrap();
686 while decoder.next_frame_ref().ok().flatten().is_some() {}
687 let mut output = Vec::new();
688 let mut ext = decoder.into_encoder(&mut output);
689 ext.write_event(&schema, &[FieldValue::Varint(2_000), FieldValue::Varint(2)])
691 .unwrap();
692 drop(ext);
693
694 let mut combined = base.clone();
696 combined.extend_from_slice(&output);
697 let mut dec = Decoder::new(&combined).unwrap();
698 let events: Vec<_> = dec
699 .decode_all()
700 .into_iter()
701 .filter_map(|f| match f {
702 DecodedFrame::Event {
703 timestamp_ns,
704 values,
705 ..
706 } => Some((timestamp_ns, values)),
707 _ => None,
708 })
709 .collect();
710 assert_eq!(events.len(), 2);
711 assert_eq!(events[0].0, Some(1_000));
712 assert_eq!(events[1].0, Some(2_000));
713 }
714
715 #[test]
716 fn decoder_into_encoder_deduplicates_interned_strings() {
717 use crate::decoder::{DecodedFrame, Decoder};
718
719 let mut enc = Encoder::new();
721 let id1 = enc.intern_string("hello").unwrap();
722 let base = enc.finish();
723
724 let mut decoder = Decoder::new(&base).unwrap();
726 while decoder.next_frame_ref().ok().flatten().is_some() {}
727 let mut output = Vec::new();
728 let mut ext = decoder.into_encoder(&mut output);
729 let id2 = ext.intern_string("hello").unwrap();
731 let id3 = ext.intern_string("world").unwrap();
732 drop(ext);
733
734 assert_eq!(id1, id2, "existing string should reuse pool ID");
735 assert_ne!(id2, id3);
736
737 let mut combined = base.clone();
739 combined.extend_from_slice(&output);
740 let mut dec = Decoder::new(&combined).unwrap();
741 let frames = dec.decode_all();
742 let pool_frames: Vec<_> = frames
743 .iter()
744 .filter(|f| matches!(f, DecodedFrame::StringPool(_)))
745 .collect();
746 assert_eq!(pool_frames.len(), 2);
748 }
749
750 #[test]
751 fn register_and_write() {
752 use crate::decoder::{DecodedFrame, Decoder};
753
754 let mut enc = Encoder::new();
755 let schema = enc
756 .register_schema(
757 "MyEvent",
758 vec![
759 FieldDef {
760 name: "count".into(),
761 field_type: FieldType::Varint,
762 },
763 FieldDef {
764 name: "name".into(),
765 field_type: FieldType::String,
766 },
767 ],
768 )
769 .unwrap();
770
771 enc.write_event(
772 &schema,
773 &[
774 FieldValue::Varint(1_000_000),
775 FieldValue::Varint(42),
776 FieldValue::String("hello".into()),
777 ],
778 )
779 .unwrap();
780
781 let bytes = enc.finish();
782 let mut dec = Decoder::new(&bytes).unwrap();
783 let frames = dec.decode_all();
784 let events: Vec<_> = frames
785 .into_iter()
786 .filter_map(|f| match f {
787 DecodedFrame::Event {
788 timestamp_ns,
789 values,
790 ..
791 } => Some((timestamp_ns, values)),
792 _ => None,
793 })
794 .collect();
795 assert_eq!(events.len(), 1);
796 assert_eq!(events[0].0, Some(1_000_000));
797 assert_eq!(events[0].1[0], FieldValue::Varint(42));
798 assert_eq!(events[0].1[1], FieldValue::String("hello".into()));
799 }
800
801 #[test]
802 fn register_conflict_errors() {
803 let mut enc = Encoder::new();
804 enc.register_schema(
805 "Ev",
806 vec![FieldDef {
807 name: "v".into(),
808 field_type: FieldType::Varint,
809 }],
810 )
811 .unwrap();
812 let result = enc.register_schema(
813 "Ev",
814 vec![FieldDef {
815 name: "other".into(),
816 field_type: FieldType::Bool,
817 }],
818 );
819 assert!(result.is_err());
820 }
821
822 #[test]
823 fn write_wrong_field_count_errors() {
824 let mut enc = Encoder::new();
825 let schema = enc
826 .register_schema(
827 "Ev",
828 vec![FieldDef {
829 name: "v".into(),
830 field_type: FieldType::Varint,
831 }],
832 )
833 .unwrap();
834 let result = enc.write_event(
836 &schema,
837 &[
838 FieldValue::Varint(0),
839 FieldValue::Varint(1),
840 FieldValue::Varint(2),
841 ],
842 );
843 assert!(result.is_err());
844 }
845
846 #[test]
849 fn timestamp_base_advances_per_event() {
850 use crate::decoder::{DecodedFrame, Decoder};
851
852 let mut enc = Encoder::new();
853 let schema = enc
854 .register_schema(
855 "Ev",
856 vec![FieldDef {
857 name: "v".into(),
858 field_type: FieldType::Varint,
859 }],
860 )
861 .unwrap();
862
863 let ts1 = 12_000_000u64;
864 let ts2 = 24_000_000u64;
865 enc.write_event(&schema, &[FieldValue::Varint(ts1), FieldValue::Varint(1)])
866 .unwrap();
867 enc.write_event(&schema, &[FieldValue::Varint(ts2), FieldValue::Varint(2)])
868 .unwrap();
869
870 let bytes = enc.finish();
871
872 let reset_count = bytes.iter().filter(|&&b| b == 0x05).count();
873 assert_eq!(
874 reset_count, 0,
875 "base should advance per event, avoiding unnecessary resets"
876 );
877
878 let mut dec = Decoder::new(&bytes).unwrap();
879 let events: Vec<_> = dec
880 .decode_all()
881 .into_iter()
882 .filter_map(|f| match f {
883 DecodedFrame::Event { timestamp_ns, .. } => timestamp_ns,
884 _ => None,
885 })
886 .collect();
887 assert_eq!(events, vec![ts1, ts2]);
888 }
889
890 #[test]
891 fn reset_to_preserves_capacity() {
892 let mut enc = Encoder::new();
893 for i in 0..100 {
894 enc.intern_string(&format!("string_{}", i)).unwrap();
895 }
896 let cap_before = enc.string_pool.capacity();
897 let _bytes = enc.reset_to(Vec::new());
898 let cap_after = enc.string_pool.capacity();
899 assert_eq!(
900 cap_before, cap_after,
901 "string_pool capacity should be preserved after reset_to"
902 );
903 }
904
905 #[test]
906 fn reset_to_returns_old_data_and_clears_state() {
907 use crate::decoder::{DecodedFrame, Decoder};
908
909 let mut enc = Encoder::new();
910 let schema = enc
911 .register_schema(
912 "Ev",
913 vec![FieldDef {
914 name: "v".into(),
915 field_type: FieldType::Varint,
916 }],
917 )
918 .unwrap();
919 enc.write_event(
920 &schema,
921 &[FieldValue::Varint(1_000), FieldValue::Varint(42)],
922 )
923 .unwrap();
924 let _s = enc.intern_string("hello").unwrap();
925
926 let old_bytes_written = enc.bytes_written();
927 assert!(old_bytes_written > 0);
928
929 let old = enc.reset_to_infallible(Vec::new());
931
932 let mut dec = Decoder::new(&old).unwrap();
934 let frames = dec.decode_all();
935 assert!(frames.iter().any(|f| matches!(f, DecodedFrame::Schema(_))));
936 assert!(
937 frames
938 .iter()
939 .any(|f| matches!(f, DecodedFrame::Event { .. }))
940 );
941 assert!(
942 frames
943 .iter()
944 .any(|f| matches!(f, DecodedFrame::StringPool(_)))
945 );
946
947 assert!(
949 enc.bytes_written() < old_bytes_written,
950 "bytes_written should reset (got {} vs old {})",
951 enc.bytes_written(),
952 old_bytes_written
953 );
954
955 enc.write_event(
958 &schema,
959 &[FieldValue::Varint(2_000), FieldValue::Varint(99)],
960 )
961 .unwrap();
962
963 let _s2 = enc.intern_string("hello").unwrap();
965
966 let new_bytes = enc.reset_to_infallible(Vec::new());
968 let mut dec2 = Decoder::new(&new_bytes).unwrap();
969 let new_frames = dec2.decode_all();
970 assert!(
972 new_frames
973 .iter()
974 .any(|f| matches!(f, DecodedFrame::Schema(s) if s.name == "Ev")),
975 "new trace must contain schema definition"
976 );
977 assert!(
979 new_frames
980 .iter()
981 .any(|f| matches!(f, DecodedFrame::StringPool(_))),
982 "new trace must contain string pool"
983 );
984 let event = new_frames
986 .iter()
987 .find_map(|f| match f {
988 DecodedFrame::Event {
989 timestamp_ns,
990 values,
991 ..
992 } => Some((timestamp_ns, values)),
993 _ => None,
994 })
995 .expect("new trace must contain event");
996 assert_eq!(*event.0, Some(2_000));
997 assert_eq!(event.1[0], FieldValue::Varint(99));
998 }
999
1000 #[test]
1001 fn into_raw_encoder_preserves_byte_count() {
1002 let mut enc = Encoder::new();
1003 let schema = enc
1004 .register_schema(
1005 "Ev",
1006 vec![FieldDef {
1007 name: "v".into(),
1008 field_type: FieldType::Varint,
1009 }],
1010 )
1011 .unwrap();
1012 enc.write_event(
1013 &schema,
1014 &[FieldValue::Varint(1_000), FieldValue::Varint(42)],
1015 )
1016 .unwrap();
1017
1018 let bytes_before = enc.bytes_written();
1019 assert!(bytes_before > 0);
1020
1021 let raw = enc.into_raw_encoder();
1022 assert_eq!(
1023 raw.bytes_written(),
1024 bytes_before,
1025 "byte count must be preserved across conversion"
1026 );
1027 }
1028
1029 #[test]
1030 fn raw_encoder_write_raw_and_bytes_written() {
1031 let enc = Encoder::new();
1032 let initial = enc.bytes_written();
1033 let mut raw = enc.into_raw_encoder();
1034
1035 let payload = [0xAA; 100];
1036 raw.write_raw(&payload).unwrap();
1037
1038 assert_eq!(
1039 raw.bytes_written(),
1040 initial + payload.len() as u64,
1041 "bytes_written must include raw payload"
1042 );
1043 }
1044
1045 #[test]
1046 fn raw_encoder_into_inner_returns_all_data() {
1047 use crate::decoder::{DecodedFrame, Decoder};
1048
1049 let mut enc = Encoder::new();
1052 let schema = enc
1053 .register_schema(
1054 "Ev",
1055 vec![FieldDef {
1056 name: "v".into(),
1057 field_type: FieldType::Varint,
1058 }],
1059 )
1060 .unwrap();
1061 enc.write_event(&schema, &[FieldValue::Varint(1_000), FieldValue::Varint(1)])
1062 .unwrap();
1063
1064 let raw_batch = {
1066 let mut batch_enc = Encoder::new();
1067 batch_enc
1068 .write_event(&schema, &[FieldValue::Varint(2_000), FieldValue::Varint(2)])
1069 .unwrap();
1070 batch_enc.finish()
1071 };
1072
1073 let mut raw = enc.into_raw_encoder();
1074 raw.write_raw(&raw_batch).unwrap();
1075 let combined = raw.into_inner();
1076
1077 let mut dec = Decoder::new(&combined).unwrap();
1078 let events: Vec<_> = dec
1079 .decode_all()
1080 .into_iter()
1081 .filter_map(|f| match f {
1082 DecodedFrame::Event {
1083 timestamp_ns,
1084 values,
1085 ..
1086 } => Some((timestamp_ns, values)),
1087 _ => None,
1088 })
1089 .collect();
1090
1091 assert_eq!(events.len(), 2);
1092 assert_eq!(events[0].0, Some(1_000));
1093 assert_eq!(events[0].1, vec![FieldValue::Varint(1)]);
1094 assert_eq!(events[1].0, Some(2_000));
1095 assert_eq!(events[1].1, vec![FieldValue::Varint(2)]);
1096 }
1097}