1use crate::TraceEvent;
4use crate::codec::{self, PoolEntry, WireTypeId};
5use crate::schema::{SchemaEntry, SchemaRegistry};
6use crate::types::{EncodeState, EventEncoder, InternedString};
7use std::any::TypeId;
8use std::collections::HashMap;
9use std::io::{self, Write};
10use std::sync::Arc;
11
12#[derive(Clone, Debug)]
21pub struct Schema {
22 entry: Arc<SchemaEntry>,
23 name_key: Arc<str>,
26}
27
28impl Schema {
29 pub fn new(name: &str, fields: Vec<crate::schema::FieldDef>) -> Self {
34 let name_key: Arc<str> = Arc::from(name);
35 Self {
36 entry: Arc::new(SchemaEntry {
37 name: name.to_string(),
38 has_timestamp: true,
39 fields,
40 }),
41 name_key,
42 }
43 }
44
45 pub fn name(&self) -> &str {
47 &self.entry.name
48 }
49
50 pub fn fields(&self) -> &[crate::schema::FieldDef] {
52 &self.entry.fields
53 }
54}
55
56#[derive(Clone, PartialEq, Eq, Hash)]
59enum SchemaKey {
60 Name(Arc<str>),
61 RustType(TypeId),
62}
63
64pub struct Encoder<W: Write = Vec<u8>> {
65 state: EncodeState<W>,
66 registry: SchemaRegistry,
67 string_pool: HashMap<String, u32>,
68 next_pool_id: u32,
69 schema_ids: HashMap<SchemaKey, WireTypeId>,
70}
71
72impl Default for Encoder<Vec<u8>> {
73 fn default() -> Self {
74 Self::new()
75 }
76}
77
78impl Encoder<Vec<u8>> {
79 pub fn new() -> Self {
80 let mut buf = Vec::new();
81 codec::encode_header(&mut buf).expect("Vec::write_all cannot fail");
82 Self {
83 state: EncodeState::new(buf),
84 registry: SchemaRegistry::new(),
85 string_pool: HashMap::new(),
86 next_pool_id: 0,
87 schema_ids: HashMap::new(),
88 }
89 }
90
91 pub fn finish(self) -> Vec<u8> {
93 self.state.writer.into_inner()
94 }
95}
96
97impl<W: Write> Encoder<W> {
98 pub fn new_to(mut writer: W) -> io::Result<Self> {
101 codec::encode_header(&mut writer)?;
102 Ok(Self {
103 state: EncodeState::new(writer),
104 registry: SchemaRegistry::new(),
105 string_pool: HashMap::new(),
106 next_pool_id: 0,
107 schema_ids: HashMap::new(),
108 })
109 }
110
111 pub(crate) fn from_decoder(
114 mut registry: SchemaRegistry,
115 string_pool: crate::decoder::StringPool,
116 timestamp_base_ns: u64,
117 writer: W,
118 ) -> Self {
119 let mut pool = HashMap::new();
120 let mut next_pool_id: u32 = 0;
121 for (id, value) in string_pool.0.into_iter() {
122 pool.insert(value, id.raw_id());
123 if id.raw_id() >= next_pool_id {
124 next_pool_id = id.raw_id() + 1;
125 }
126 }
127
128 let mut schema_ids = HashMap::new();
129 for (wire_id, entry) in registry.entries() {
130 schema_ids.insert(SchemaKey::Name(Arc::from(entry.name.as_str())), wire_id);
131 }
132 registry.sync_next_id();
133
134 let mut state = EncodeState::new(writer);
135 state.timestamp_base_ns = timestamp_base_ns;
136
137 Self {
138 state,
139 registry,
140 string_pool: pool,
141 next_pool_id,
142 schema_ids,
143 }
144 }
145
146 pub fn into_inner(self) -> W {
148 self.state.writer.into_inner()
149 }
150
151 pub fn as_inner(&self) -> &W {
153 self.state.writer.inner()
154 }
155
156 pub fn bytes_written(&self) -> u64 {
158 self.state.writer.bytes_written()
159 }
160
161 fn ensure_registered(&mut self, schema: &Schema) -> io::Result<WireTypeId> {
167 let key = SchemaKey::Name(Arc::clone(&schema.name_key));
168 if let Some(&wire_id) = self.schema_ids.get(&key) {
169 let existing = self.registry.get(wire_id).unwrap();
170 if *existing == *schema.entry {
171 return Ok(wire_id);
172 }
173 return Err(io::Error::new(
174 io::ErrorKind::InvalidInput,
175 format!(
176 "schema already registered with different definition: {}",
177 schema.name()
178 ),
179 ));
180 }
181 let id = self.registry.next_type_id();
182 codec::encode_schema(id, &schema.entry, &mut self.state.writer)?;
183 self.registry
184 .register(id, (*schema.entry).clone())
185 .expect("schema registration failed");
186 self.schema_ids.insert(key, id);
187 Ok(id)
188 }
189
190 pub fn register_schema(
200 &mut self,
201 name: &str,
202 fields: Vec<crate::schema::FieldDef>,
203 ) -> io::Result<Schema> {
204 let schema = Schema::new(name, fields);
205 self.ensure_registered(&schema)?;
206 Ok(schema)
207 }
208
209 pub fn write_event(
218 &mut self,
219 schema: &Schema,
220 values: &[crate::types::FieldValue],
221 ) -> io::Result<()> {
222 use crate::types::FieldValue;
223
224 let type_id = self.ensure_registered(schema)?;
225 let expected_fields = schema.entry.fields.len();
226
227 let ts_ns = match values.first() {
228 Some(FieldValue::Varint(ns)) => *ns,
229 _ => {
230 return Err(io::Error::new(
231 io::ErrorKind::InvalidInput,
232 "first value must be FieldValue::Varint(timestamp_ns)",
233 ));
234 }
235 };
236 let field_values = &values[1..];
237
238 if field_values.len() != expected_fields {
239 return Err(io::Error::new(
240 io::ErrorKind::InvalidInput,
241 format!(
242 "value count ({}) does not match schema field count ({}) for schema '{}'",
243 field_values.len(),
244 expected_fields,
245 schema.name(),
246 ),
247 ));
248 }
249
250 let ts_delta = self.state.encode_timestamp_delta(ts_ns)?;
251 self.state.writer.write_all(&[codec::TAG_EVENT])?;
252 self.state.writer.write_all(&type_id.0.to_le_bytes())?;
253 codec::encode_u24_le(ts_delta, &mut self.state.writer)?;
254 let mut enc = EventEncoder::new(&mut self.state);
255 for v in field_values {
256 enc.write_field_value(v)?;
257 }
258 Ok(())
259 }
260
261 pub fn write<T: TraceEvent + 'static>(&mut self, event: &T) -> io::Result<()> {
264 let key = SchemaKey::RustType(TypeId::of::<T>());
265 let tid = if let Some(&cached) = self.schema_ids.get(&key) {
266 cached
267 } else {
268 let entry = T::schema_entry();
269 let schema = Schema::new(&entry.name, entry.fields);
270 let id = self.ensure_registered(&schema)?;
271 self.schema_ids.insert(key, id);
272 id
273 };
274 let ts_ns = event.timestamp();
275 let ts_delta = self.state.encode_timestamp_delta(ts_ns)?;
276 self.state.writer.write_all(&[codec::TAG_EVENT])?;
277 self.state.writer.write_all(&tid.0.to_le_bytes())?;
278 codec::encode_u24_le(ts_delta, &mut self.state.writer)?;
279 let mut enc = EventEncoder::new(&mut self.state);
280 event.encode_fields(&mut enc)
281 }
282
283 pub fn intern_string(&mut self, s: &str) -> io::Result<InternedString> {
285 if let Some(&id) = self.string_pool.get(s) {
286 return Ok(InternedString(id));
287 }
288 let id = self.next_pool_id;
289 self.next_pool_id += 1;
290 self.string_pool.insert(s.to_string(), id);
291 codec::encode_string_pool(
292 &[PoolEntry {
293 pool_id: id,
294 data: s.as_bytes().to_vec(),
295 }],
296 &mut self.state.writer,
297 )?;
298 Ok(InternedString(id))
299 }
300
301 pub fn write_string_pool(&mut self, entries: &[PoolEntry]) -> io::Result<()> {
302 codec::encode_string_pool(entries, &mut self.state.writer)
303 }
304
305 pub fn flush(&mut self) -> io::Result<()> {
307 self.state.writer.flush()
308 }
309}
310
311#[cfg(test)]
312mod tests {
313 use super::*;
314 use crate::schema::FieldDef;
315 use crate::types::{FieldType, FieldValue};
316
317 #[test]
318 fn encoder_writes_header() {
319 let enc = Encoder::new();
320 let data = enc.finish();
321 assert_eq!(&data[..5], &[0x54, 0x52, 0x43, 0x00, 1]);
322 }
323
324 #[test]
325 fn encoder_register_and_write_event() {
326 let mut enc = Encoder::new();
327 let schema = enc
328 .register_schema(
329 "Ev",
330 vec![FieldDef {
331 name: "v".into(),
332 field_type: FieldType::Varint,
333 }],
334 )
335 .unwrap();
336 enc.write_event(
337 &schema,
338 &[FieldValue::Varint(1_000), FieldValue::Varint(42)],
339 )
340 .unwrap();
341 let data = enc.finish();
342 assert!(data.len() > 5);
343 }
344
345 #[test]
346 fn idempotent_re_registration() {
347 let mut enc = Encoder::new();
348 let fields = vec![FieldDef {
349 name: "v".into(),
350 field_type: FieldType::Varint,
351 }];
352 let _s1 = enc.register_schema("Ev", fields.clone()).unwrap();
353 let _s2 = enc.register_schema("Ev", fields).unwrap();
354 }
356
357 #[test]
358 fn re_registration_different_schema_errors() {
359 let mut enc = Encoder::new();
360 enc.register_schema(
361 "Ev",
362 vec![FieldDef {
363 name: "v".into(),
364 field_type: FieldType::Varint,
365 }],
366 )
367 .unwrap();
368 let result = enc.register_schema(
369 "Ev",
370 vec![FieldDef {
371 name: "different".into(),
372 field_type: FieldType::Bool,
373 }],
374 );
375 assert!(result.is_err());
376 }
377
378 #[test]
379 fn schema_auto_registers_on_write() {
380 use crate::decoder::{DecodedFrame, Decoder};
381
382 let schema = Schema::new(
384 "Lazy",
385 vec![FieldDef {
386 name: "v".into(),
387 field_type: FieldType::Varint,
388 }],
389 );
390
391 let mut enc = Encoder::new();
393 enc.write_event(
394 &schema,
395 &[FieldValue::Varint(1_000), FieldValue::Varint(42)],
396 )
397 .unwrap();
398
399 let bytes = enc.finish();
400 let mut dec = Decoder::new(&bytes).unwrap();
401 let frames = dec.decode_all();
402 assert!(matches!(&frames[0], DecodedFrame::Schema(s) if s.name == "Lazy"));
403 if let DecodedFrame::Event { values, .. } = &frames[1] {
404 assert_eq!(*values, vec![FieldValue::Varint(42)]);
405 } else {
406 panic!("expected event");
407 }
408 }
409
410 #[test]
411 fn schema_portable_across_encoders() {
412 use crate::decoder::{DecodedFrame, Decoder};
413
414 let mut enc1 = Encoder::new();
415 let schema = enc1
416 .register_schema(
417 "Shared",
418 vec![FieldDef {
419 name: "v".into(),
420 field_type: FieldType::Varint,
421 }],
422 )
423 .unwrap();
424 enc1.write_event(&schema, &[FieldValue::Varint(1_000), FieldValue::Varint(1)])
425 .unwrap();
426
427 let mut enc2 = Encoder::new();
429 enc2.write_event(&schema, &[FieldValue::Varint(2_000), FieldValue::Varint(2)])
430 .unwrap();
431
432 for (enc, expected_val) in [(enc1, 1u64), (enc2, 2u64)] {
434 let bytes = enc.finish();
435 let mut dec = Decoder::new(&bytes).unwrap();
436 let frames = dec.decode_all();
437 let event = frames
438 .iter()
439 .find(|f| matches!(f, DecodedFrame::Event { .. }))
440 .unwrap();
441 if let DecodedFrame::Event { values, .. } = event {
442 assert_eq!(values[0], FieldValue::Varint(expected_val));
443 }
444 }
445 }
446
447 #[test]
448 fn encoder_intern_string_deduplicates() {
449 let mut enc = Encoder::new();
450 let id1 = enc.intern_string("hello").unwrap();
451 let id2 = enc.intern_string("hello").unwrap();
452 let id3 = enc.intern_string("world").unwrap();
453 assert_eq!(id1, id2);
454 assert_ne!(id1, id3);
455 }
456
457 #[test]
458 fn timestamp_round_trip() {
459 use crate::decoder::{DecodedFrame, Decoder};
460
461 let mut enc = Encoder::new();
462 let schema = enc
463 .register_schema(
464 "TS",
465 vec![FieldDef {
466 name: "v".into(),
467 field_type: FieldType::Varint,
468 }],
469 )
470 .unwrap();
471
472 let ts1 = 100_000u64;
473 let ts2 = 50_000u64;
474 let ts3 = 200_000_000u64;
475 let ts4 = 100_000_000u64;
476 enc.write_event(&schema, &[FieldValue::Varint(ts1), FieldValue::Varint(1)])
477 .unwrap();
478 enc.write_event(&schema, &[FieldValue::Varint(ts2), FieldValue::Varint(2)])
479 .unwrap();
480 enc.write_event(&schema, &[FieldValue::Varint(ts3), FieldValue::Varint(3)])
481 .unwrap();
482 enc.write_event(&schema, &[FieldValue::Varint(ts4), FieldValue::Varint(4)])
483 .unwrap();
484
485 let bytes = enc.finish();
486 let mut dec = Decoder::new(&bytes).unwrap();
487 let events: Vec<_> = dec
488 .decode_all()
489 .into_iter()
490 .filter_map(|f| match f {
491 DecodedFrame::Event {
492 timestamp_ns,
493 values,
494 ..
495 } => Some((timestamp_ns, values)),
496 _ => None,
497 })
498 .collect();
499
500 assert_eq!(events.len(), 4);
501 assert_eq!(events[0].0, Some(ts1));
502 assert_eq!(events[0].1, vec![FieldValue::Varint(1)]);
503 assert_eq!(events[1].0, Some(ts2));
504 assert_eq!(events[1].1, vec![FieldValue::Varint(2)]);
505 assert_eq!(events[2].0, Some(ts3));
506 assert_eq!(events[2].1, vec![FieldValue::Varint(3)]);
507 assert_eq!(events[3].0, Some(ts4));
508 assert_eq!(events[3].1, vec![FieldValue::Varint(4)]);
509 }
510
511 #[test]
512 fn encoder_new_to_writer() {
513 let mut buf = Vec::new();
514 let enc = Encoder::new_to(&mut buf).unwrap();
515 drop(enc);
516 assert!(buf.len() >= 5);
517 assert_eq!(&buf[..5], &[0x54, 0x52, 0x43, 0x00, 1]);
518 }
519
520 #[test]
521 fn decoder_into_encoder_appends_without_header() {
522 use crate::decoder::{DecodedFrame, Decoder};
523
524 let mut enc = Encoder::new();
526 let schema = enc
527 .register_schema(
528 "Ev",
529 vec![FieldDef {
530 name: "v".into(),
531 field_type: FieldType::Varint,
532 }],
533 )
534 .unwrap();
535 enc.write_event(&schema, &[FieldValue::Varint(1_000), FieldValue::Varint(1)])
536 .unwrap();
537 let base = enc.finish();
538
539 let mut decoder = Decoder::new(&base).unwrap();
541 while decoder.next_frame_ref().ok().flatten().is_some() {}
542 let mut output = Vec::new();
543 let mut ext = decoder.into_encoder(&mut output);
544 ext.write_event(&schema, &[FieldValue::Varint(2_000), FieldValue::Varint(2)])
546 .unwrap();
547 drop(ext);
548
549 let mut combined = base.clone();
551 combined.extend_from_slice(&output);
552 let mut dec = Decoder::new(&combined).unwrap();
553 let events: Vec<_> = dec
554 .decode_all()
555 .into_iter()
556 .filter_map(|f| match f {
557 DecodedFrame::Event {
558 timestamp_ns,
559 values,
560 ..
561 } => Some((timestamp_ns, values)),
562 _ => None,
563 })
564 .collect();
565 assert_eq!(events.len(), 2);
566 assert_eq!(events[0].0, Some(1_000));
567 assert_eq!(events[1].0, Some(2_000));
568 }
569
570 #[test]
571 fn decoder_into_encoder_deduplicates_interned_strings() {
572 use crate::decoder::{DecodedFrame, Decoder};
573
574 let mut enc = Encoder::new();
576 let id1 = enc.intern_string("hello").unwrap();
577 let base = enc.finish();
578
579 let mut decoder = Decoder::new(&base).unwrap();
581 while decoder.next_frame_ref().ok().flatten().is_some() {}
582 let mut output = Vec::new();
583 let mut ext = decoder.into_encoder(&mut output);
584 let id2 = ext.intern_string("hello").unwrap();
586 let id3 = ext.intern_string("world").unwrap();
587 drop(ext);
588
589 assert_eq!(id1, id2, "existing string should reuse pool ID");
590 assert_ne!(id2, id3);
591
592 let mut combined = base.clone();
594 combined.extend_from_slice(&output);
595 let mut dec = Decoder::new(&combined).unwrap();
596 let frames = dec.decode_all();
597 let pool_frames: Vec<_> = frames
598 .iter()
599 .filter(|f| matches!(f, DecodedFrame::StringPool(_)))
600 .collect();
601 assert_eq!(pool_frames.len(), 2);
603 }
604
605 #[test]
606 fn register_and_write() {
607 use crate::decoder::{DecodedFrame, Decoder};
608
609 let mut enc = Encoder::new();
610 let schema = enc
611 .register_schema(
612 "MyEvent",
613 vec![
614 FieldDef {
615 name: "count".into(),
616 field_type: FieldType::Varint,
617 },
618 FieldDef {
619 name: "name".into(),
620 field_type: FieldType::String,
621 },
622 ],
623 )
624 .unwrap();
625
626 enc.write_event(
627 &schema,
628 &[
629 FieldValue::Varint(1_000_000),
630 FieldValue::Varint(42),
631 FieldValue::String("hello".into()),
632 ],
633 )
634 .unwrap();
635
636 let bytes = enc.finish();
637 let mut dec = Decoder::new(&bytes).unwrap();
638 let frames = dec.decode_all();
639 let events: Vec<_> = frames
640 .into_iter()
641 .filter_map(|f| match f {
642 DecodedFrame::Event {
643 timestamp_ns,
644 values,
645 ..
646 } => Some((timestamp_ns, values)),
647 _ => None,
648 })
649 .collect();
650 assert_eq!(events.len(), 1);
651 assert_eq!(events[0].0, Some(1_000_000));
652 assert_eq!(events[0].1[0], FieldValue::Varint(42));
653 assert_eq!(events[0].1[1], FieldValue::String("hello".into()));
654 }
655
656 #[test]
657 fn register_conflict_errors() {
658 let mut enc = Encoder::new();
659 enc.register_schema(
660 "Ev",
661 vec![FieldDef {
662 name: "v".into(),
663 field_type: FieldType::Varint,
664 }],
665 )
666 .unwrap();
667 let result = enc.register_schema(
668 "Ev",
669 vec![FieldDef {
670 name: "other".into(),
671 field_type: FieldType::Bool,
672 }],
673 );
674 assert!(result.is_err());
675 }
676
677 #[test]
678 fn write_wrong_field_count_errors() {
679 let mut enc = Encoder::new();
680 let schema = enc
681 .register_schema(
682 "Ev",
683 vec![FieldDef {
684 name: "v".into(),
685 field_type: FieldType::Varint,
686 }],
687 )
688 .unwrap();
689 let result = enc.write_event(
691 &schema,
692 &[
693 FieldValue::Varint(0),
694 FieldValue::Varint(1),
695 FieldValue::Varint(2),
696 ],
697 );
698 assert!(result.is_err());
699 }
700
701 #[test]
704 fn timestamp_base_advances_per_event() {
705 use crate::decoder::{DecodedFrame, Decoder};
706
707 let mut enc = Encoder::new();
708 let schema = enc
709 .register_schema(
710 "Ev",
711 vec![FieldDef {
712 name: "v".into(),
713 field_type: FieldType::Varint,
714 }],
715 )
716 .unwrap();
717
718 let ts1 = 12_000_000u64;
719 let ts2 = 24_000_000u64;
720 enc.write_event(&schema, &[FieldValue::Varint(ts1), FieldValue::Varint(1)])
721 .unwrap();
722 enc.write_event(&schema, &[FieldValue::Varint(ts2), FieldValue::Varint(2)])
723 .unwrap();
724
725 let bytes = enc.finish();
726
727 let reset_count = bytes.iter().filter(|&&b| b == 0x05).count();
728 assert_eq!(
729 reset_count, 0,
730 "base should advance per event, avoiding unnecessary resets"
731 );
732
733 let mut dec = Decoder::new(&bytes).unwrap();
734 let events: Vec<_> = dec
735 .decode_all()
736 .into_iter()
737 .filter_map(|f| match f {
738 DecodedFrame::Event { timestamp_ns, .. } => timestamp_ns,
739 _ => None,
740 })
741 .collect();
742 assert_eq!(events, vec![ts1, ts2]);
743 }
744}