1use crate::codec::{
10 self, Frame, FrameRef, HEADER_SIZE, PoolEntry, PoolEntryRef, SchemaInfo, StackPoolEntry,
11 StackPoolEntryRef, WireTypeId,
12};
13use crate::schema::{SchemaEntry, SchemaRegistry};
14use crate::types::{FieldType, FieldValueRef, InternedStackFrames, InternedString, StackFrames};
15use std::collections::HashMap;
16use std::fmt;
17
18#[derive(Debug, Clone)]
22pub struct DecodeError {
23 pub pos: usize,
24 pub message: String,
25}
26
27impl fmt::Display for DecodeError {
28 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
29 write!(f, "decode error at byte {}: {}", self.pos, self.message)
30 }
31}
32
33impl std::error::Error for DecodeError {}
34
35#[derive(Debug)]
37pub enum TryForEachError<E> {
38 Decode(DecodeError),
39 User(E),
40}
41
42impl<E: fmt::Display> fmt::Display for TryForEachError<E> {
43 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
44 match self {
45 TryForEachError::Decode(e) => write!(f, "{e}"),
46 TryForEachError::User(e) => write!(f, "{e}"),
47 }
48 }
49}
50
51impl<E: fmt::Display + fmt::Debug> std::error::Error for TryForEachError<E> {}
52
53#[non_exhaustive]
58pub struct RawEvent<'a, 'f> {
59 pub type_id: WireTypeId,
60 pub name: &'f str,
61 pub timestamp_ns: Option<u64>,
62 pub fields: &'f [FieldValueRef<'a>],
63 pub schema: &'f SchemaEntry,
64 pub string_pool: &'f StringPool,
65 pub stack_pool: &'f StackPool,
66}
67
68impl<'a, 'f> RawEvent<'a, 'f> {
69 pub fn field_names(&self) -> impl Iterator<Item = &'f str> {
71 self.schema.fields.iter().map(|f| f.name.as_str())
72 }
73}
74
75#[derive(Debug, Default)]
81pub struct StringPool(pub(crate) HashMap<InternedString, String>);
82
83impl StringPool {
84 pub(crate) fn new() -> Self {
85 Self(HashMap::default())
86 }
87
88 pub(crate) fn insert(&mut self, id: InternedString, value: String) {
89 self.0.insert(id, value);
90 }
91
92 pub fn get(&self, id: InternedString) -> Option<&str> {
93 self.0.get(&id).map(|s| s.as_str())
94 }
95
96 pub fn len(&self) -> usize {
97 self.0.len()
98 }
99
100 pub fn is_empty(&self) -> bool {
101 self.0.is_empty()
102 }
103
104 pub fn iter(&self) -> impl Iterator<Item = (InternedString, &str)> {
106 self.0.iter().map(|(&id, v)| (id, v.as_str()))
107 }
108}
109
110#[derive(Debug, Default)]
114pub struct StackPool(pub(crate) HashMap<InternedStackFrames, Vec<u64>>);
115
116impl StackPool {
117 pub(crate) fn new() -> Self {
118 Self(HashMap::default())
119 }
120
121 pub(crate) fn insert(&mut self, id: InternedStackFrames, frames: StackFrames) {
122 self.0.insert(id, frames.0);
123 }
124
125 pub fn get(&self, id: InternedStackFrames) -> Option<&[u64]> {
126 self.0.get(&id).map(|v| v.as_slice())
127 }
128
129 pub fn len(&self) -> usize {
130 self.0.len()
131 }
132
133 pub fn is_empty(&self) -> bool {
134 self.0.is_empty()
135 }
136
137 pub fn iter(&self) -> impl Iterator<Item = (InternedStackFrames, &[u64])> {
139 self.0.iter().map(|(&id, v)| (id, v.as_slice()))
140 }
141}
142
143#[derive(Debug, Clone, PartialEq)]
145pub enum DecodedFrame {
146 Schema(SchemaEntry),
147 Event {
148 type_id: WireTypeId,
149 timestamp_ns: Option<u64>,
151 values: Vec<crate::types::FieldValue>,
152 },
153 StringPool(Vec<PoolEntry>),
154 StackPool(Vec<StackPoolEntry>),
155}
156
157#[derive(Debug, Clone, PartialEq)]
159pub enum DecodedFrameRef<'a> {
160 Schema(SchemaEntry),
161 Event {
162 type_id: WireTypeId,
163 timestamp_ns: Option<u64>,
164 values: Vec<FieldValueRef<'a>>,
165 },
166 StringPool(Vec<PoolEntryRef<'a>>),
167 StackPool(Vec<StackPoolEntryRef<'a>>),
168}
169
170struct SchemaCache {
171 entry: SchemaEntry,
172 field_tags: Vec<u8>,
174}
175
176pub struct Decoder<'a> {
181 data: &'a [u8],
182 pos: usize,
183 registry: SchemaRegistry,
184 schema_cache: Vec<Option<SchemaCache>>,
185 string_pool: StringPool,
186 stack_pool: StackPool,
187 version: u8,
188 timestamp_base_ns: u64,
189}
190
191impl<'a> Decoder<'a> {
192 pub fn new(data: &'a [u8]) -> Option<Self> {
193 let version = codec::decode_header(data)?;
194 Some(Self {
195 data,
196 pos: HEADER_SIZE,
197 registry: SchemaRegistry::new(),
198 schema_cache: Vec::new(),
199 string_pool: StringPool::new(),
200 stack_pool: StackPool::new(),
201 version,
202 timestamp_base_ns: 0,
203 })
204 }
205
206 pub fn registry(&self) -> &SchemaRegistry {
207 &self.registry
208 }
209
210 pub fn version(&self) -> u8 {
211 self.version
212 }
213
214 pub fn string_pool(&self) -> &StringPool {
215 &self.string_pool
216 }
217
218 pub fn stack_pool(&self) -> &StackPool {
219 &self.stack_pool
220 }
221
222 fn reset_state(&mut self) {
226 self.registry = SchemaRegistry::new();
227 self.schema_cache.clear();
228 self.string_pool = StringPool::new();
229 self.stack_pool = StackPool::new();
230 self.timestamp_base_ns = 0;
231 }
232
233 fn try_consume_reset_header(&mut self) -> bool {
236 if self.pos + HEADER_SIZE <= self.data.len()
237 && codec::decode_header(&self.data[self.pos..]).is_some()
238 {
239 self.reset_state();
240 self.pos += HEADER_SIZE;
241 true
242 } else {
243 false
244 }
245 }
246
247 pub fn into_encoder<W: std::io::Write>(self, writer: W) -> crate::encoder::Encoder<W> {
254 crate::encoder::Encoder::from_decoder(
255 self.registry,
256 self.string_pool,
257 self.stack_pool,
258 self.timestamp_base_ns,
259 writer,
260 )
261 }
262
263 pub(crate) fn schema_info(&self, type_id: WireTypeId) -> Option<SchemaInfo<'_>> {
264 self.schema_cache
265 .get(type_id.0 as usize)
266 .and_then(|s| s.as_ref())
267 .map(|c| SchemaInfo {
268 field_tags: &c.field_tags,
269 has_timestamp: c.entry.has_timestamp,
270 })
271 }
272
273 fn register_schema(&mut self, type_id: WireTypeId, entry: SchemaEntry) -> Result<(), String> {
274 let idx = type_id.0 as usize;
275 if idx >= self.schema_cache.len() {
276 self.schema_cache.resize_with(idx + 1, || None);
277 }
278 self.schema_cache[idx] = Some(SchemaCache {
279 field_tags: entry.fields.iter().map(|f| f.field_type as u8).collect(),
280 entry: entry.clone(),
281 });
282 self.registry.register(type_id, entry)
283 }
284
285 pub fn next_frame(&mut self) -> Result<Option<DecodedFrame>, DecodeError> {
289 if self.pos >= self.data.len() {
290 return Ok(None);
291 }
292 if self.try_consume_reset_header() {
293 return self.next_frame();
294 }
295 let remaining = &self.data[self.pos..];
296 let base = self.timestamp_base_ns;
297 let (frame, consumed) =
298 match codec::decode_frame(remaining, |type_id| self.schema_info(type_id), base) {
299 Some(r) => r,
300 None => return Ok(None),
301 };
302 self.pos += consumed;
303 match frame {
304 Frame::Schema { type_id, entry } => {
305 let result = DecodedFrame::Schema(entry.clone());
306 self.register_schema(type_id, entry)
307 .map_err(|msg| DecodeError {
308 pos: self.pos,
309 message: msg,
310 })?;
311 Ok(Some(result))
312 }
313 Frame::Event {
314 type_id,
315 timestamp_ns,
316 values,
317 } => {
318 if let Some(ts) = timestamp_ns {
319 self.timestamp_base_ns = ts;
320 }
321 Ok(Some(DecodedFrame::Event {
322 type_id,
323 timestamp_ns,
324 values,
325 }))
326 }
327 Frame::StringPool(entries) => {
328 for e in &entries {
329 if let Ok(s) = String::from_utf8(e.data.clone()) {
330 self.string_pool.insert(InternedString(e.pool_id), s);
331 }
332 }
333 Ok(Some(DecodedFrame::StringPool(entries)))
334 }
335 Frame::StackPool(entries) => {
336 for e in &entries {
337 self.stack_pool
338 .insert(InternedStackFrames(e.pool_id), e.frames.clone().into());
339 }
340 Ok(Some(DecodedFrame::StackPool(entries)))
341 }
342 Frame::TimestampReset(ts) => {
343 self.timestamp_base_ns = ts;
344 self.next_frame() }
346 }
347 }
348
349 pub fn decode_all(&mut self) -> Vec<DecodedFrame> {
351 let mut frames = Vec::new();
352 while let Ok(Some(f)) = self.next_frame() {
353 frames.push(f);
354 }
355 frames
356 }
357
358 pub fn next_frame_ref(&mut self) -> Result<Option<DecodedFrameRef<'a>>, DecodeError> {
361 if self.pos >= self.data.len() {
362 return Ok(None);
363 }
364 if self.try_consume_reset_header() {
365 return self.next_frame_ref();
366 }
367 let remaining = &self.data[self.pos..];
368 let base = self.timestamp_base_ns;
369 let (frame, consumed) =
370 match codec::decode_frame_ref(remaining, |type_id| self.schema_info(type_id), base) {
371 Some(r) => r,
372 None => return Ok(None),
373 };
374 self.pos += consumed;
375 match frame {
376 FrameRef::Schema { type_id, entry } => {
377 let result = DecodedFrameRef::Schema(entry.clone());
378 self.register_schema(type_id, entry)
379 .map_err(|msg| DecodeError {
380 pos: self.pos,
381 message: msg,
382 })?;
383 Ok(Some(result))
384 }
385 FrameRef::Event {
386 type_id,
387 timestamp_ns,
388 values,
389 } => {
390 if let Some(ts) = timestamp_ns {
391 self.timestamp_base_ns = ts;
392 }
393 Ok(Some(DecodedFrameRef::Event {
394 type_id,
395 timestamp_ns,
396 values,
397 }))
398 }
399 FrameRef::StringPool(entries) => {
400 for e in &entries {
401 if let Ok(s) = std::str::from_utf8(e.data) {
402 self.string_pool
403 .insert(InternedString(e.pool_id), s.to_string());
404 }
405 }
406 Ok(Some(DecodedFrameRef::StringPool(entries)))
407 }
408 FrameRef::StackPool(entries) => {
409 for e in &entries {
410 self.stack_pool
411 .insert(InternedStackFrames(e.pool_id), e.to_stack_frames());
412 }
413 Ok(Some(DecodedFrameRef::StackPool(entries)))
414 }
415 FrameRef::TimestampReset(ts) => {
416 self.timestamp_base_ns = ts;
417 self.next_frame_ref()
418 }
419 }
420 }
421
422 pub fn decode_all_ref(&mut self) -> Vec<DecodedFrameRef<'a>> {
424 let mut frames = Vec::new();
425 while let Ok(Some(f)) = self.next_frame_ref() {
426 frames.push(f);
427 }
428 frames
429 }
430
431 pub fn for_each_event(
440 &mut self,
441 mut f: impl for<'f> FnMut(RawEvent<'a, 'f>),
442 ) -> Result<(), DecodeError> {
443 self.try_for_each_event(|ev| {
444 f(ev);
445 Ok::<(), std::convert::Infallible>(())
446 })
447 .map_err(|e| match e {
448 TryForEachError::Decode(d) => d,
449 TryForEachError::User(inf) => match inf {},
450 })
451 }
452
453 pub fn try_for_each_event<E>(
456 &mut self,
457 mut f: impl for<'f> FnMut(RawEvent<'a, 'f>) -> Result<(), E>,
458 ) -> Result<(), TryForEachError<E>> {
459 let mut values_buf: Vec<FieldValueRef<'a>> = Vec::new();
460 while self.pos < self.data.len() {
461 let remaining = &self.data[self.pos..];
462 let tag = match remaining.first() {
463 Some(t) => *t,
464 None => break,
465 };
466 match tag {
467 codec::TAG_EVENT => {
468 let mut pos = 1;
469 let type_id = match remaining.get(pos..pos + 2) {
470 Some(b) => {
471 pos += 2;
472 WireTypeId(u16::from_le_bytes(b.try_into().unwrap()))
473 }
474 None => {
475 return Err(TryForEachError::Decode(DecodeError {
476 pos: self.pos,
477 message: "truncated event frame".into(),
478 }));
479 }
480 };
481 let cache = match self
482 .schema_cache
483 .get(type_id.0 as usize)
484 .and_then(|s| s.as_ref())
485 {
486 Some(c) => c,
487 None => {
488 return Err(TryForEachError::Decode(DecodeError {
489 pos: self.pos,
490 message: format!("unknown type_id {type_id:?}"),
491 }));
492 }
493 };
494
495 let timestamp_ns = if cache.entry.has_timestamp {
496 match codec::decode_u24_le(&remaining[pos..]) {
497 Some(delta) => {
498 pos += 3;
499 Some(self.timestamp_base_ns + delta as u64)
500 }
501 None => {
502 return Err(TryForEachError::Decode(DecodeError {
503 pos: self.pos + pos,
504 message: "truncated timestamp delta".into(),
505 }));
506 }
507 }
508 } else {
509 None
510 };
511
512 values_buf.clear();
513 for &ftag in &cache.field_tags {
514 let inner_type = match FieldType::from_tag(ftag) {
515 Some(ft) => ft,
516 None => {
517 return Err(TryForEachError::Decode(DecodeError {
518 pos: self.pos + pos,
519 message: format!("unknown field type tag {ftag:#x}"),
520 }));
521 }
522 };
523 if inner_type.is_optional() {
524 match remaining.get(pos) {
525 Some(0x00) => {
526 values_buf.push(FieldValueRef::None);
527 pos += 1;
528 }
529 Some(_) => {
530 pos += 1;
531 match FieldValueRef::decode(inner_type.inner(), remaining, pos)
532 {
533 Some((val, consumed)) => {
534 values_buf.push(val);
535 pos += consumed;
536 }
537 None => {
538 return Err(TryForEachError::Decode(DecodeError {
539 pos: self.pos + pos,
540 message: "truncated optional field value".into(),
541 }));
542 }
543 }
544 }
545 None => {
546 return Err(TryForEachError::Decode(DecodeError {
547 pos: self.pos + pos,
548 message: "truncated optional field prefix".into(),
549 }));
550 }
551 }
552 } else {
553 match FieldValueRef::decode(inner_type, remaining, pos) {
554 Some((val, consumed)) => {
555 values_buf.push(val);
556 pos += consumed;
557 }
558 None => {
559 return Err(TryForEachError::Decode(DecodeError {
560 pos: self.pos + pos,
561 message: "truncated field value".into(),
562 }));
563 }
564 }
565 }
566 }
567 {
573 let Self {
574 pos: self_pos,
575 timestamp_base_ns,
576 ..
577 } = self;
578 *self_pos += pos;
579 if let Some(ts) = timestamp_ns {
580 *timestamp_base_ns = ts;
581 }
582 }
583 f(RawEvent {
584 type_id,
585 name: &cache.entry.name,
586 timestamp_ns,
587 fields: &values_buf,
588 schema: &cache.entry,
589 string_pool: &self.string_pool,
590 stack_pool: &self.stack_pool,
591 })
592 .map_err(TryForEachError::User)?;
593 }
594 codec::TAG_TIMESTAMP_RESET => {
595 let ts = match self.data.get(self.pos + 1..self.pos + 9) {
596 Some(b) => u64::from_le_bytes(b.try_into().unwrap()),
597 None => {
598 return Err(TryForEachError::Decode(DecodeError {
599 pos: self.pos,
600 message: "truncated timestamp reset".into(),
601 }));
602 }
603 };
604 self.timestamp_base_ns = ts;
605 self.pos += 9;
606 }
607 _ => {
608 if tag == codec::MAGIC[0] && self.try_consume_reset_header() {
610 continue;
611 }
612 match self.next_frame_ref() {
613 Ok(Some(_)) => {}
614 Ok(None) => {
615 return Err(TryForEachError::Decode(DecodeError {
616 pos: self.pos,
617 message: format!("failed to decode frame with tag 0x{tag:02x}"),
618 }));
619 }
620 Err(e) => return Err(TryForEachError::Decode(e)),
621 }
622 }
623 }
624 }
625 Ok(())
626 }
627
628 pub fn events(&mut self) -> EventIter<'_, 'a> {
632 EventIter { decoder: self }
633 }
634}
635
636impl<'a> Iterator for Decoder<'a> {
637 type Item = Result<DecodedFrameRef<'a>, DecodeError>;
638
639 fn next(&mut self) -> Option<Self::Item> {
640 self.next_frame_ref().transpose()
641 }
642}
643
644pub struct EventIter<'d, 'a> {
647 decoder: &'d mut Decoder<'a>,
648}
649
650impl<'d, 'a> Iterator for EventIter<'d, 'a> {
651 type Item = Result<DecodedFrameRef<'a>, DecodeError>;
652
653 fn next(&mut self) -> Option<Self::Item> {
654 loop {
655 match self.decoder.next()? {
656 Ok(frame @ DecodedFrameRef::Event { .. }) => return Some(Ok(frame)),
657 Ok(_) => continue, Err(e) => return Some(Err(e)),
659 }
660 }
661 }
662}
663
664#[cfg(test)]
665mod tests {
666 use super::*;
667 use crate::encoder::Encoder;
668 use crate::schema::FieldDef;
669 use crate::types::{FieldType, FieldValue};
670
671 #[test]
672 fn decode_empty_stream() {
673 let enc = Encoder::new();
674 let data = enc.finish();
675 let mut dec = Decoder::new(&data).unwrap();
676 assert_eq!(dec.version(), 1);
677 assert!(dec.next_frame().unwrap().is_none());
678 }
679
680 #[test]
681 fn decode_schema_frame() {
682 let mut enc = Encoder::new();
683 enc.register_schema(
684 "Ev",
685 vec![FieldDef {
686 name: "v".into(),
687 field_type: FieldType::Varint,
688 }],
689 )
690 .unwrap();
691 let data = enc.finish();
692 let mut dec = Decoder::new(&data).unwrap();
693 let frame = dec.next_frame().unwrap().unwrap();
694 assert!(matches!(frame, DecodedFrame::Schema(s) if s.name == "Ev"));
695 }
696
697 #[test]
698 fn decode_event_after_schema() {
699 let mut enc = Encoder::new();
700 let schema = enc
701 .register_schema(
702 "Ev",
703 vec![FieldDef {
704 name: "v".into(),
705 field_type: FieldType::Varint,
706 }],
707 )
708 .unwrap();
709 enc.write_event(
710 &schema,
711 &[FieldValue::Varint(1_000), FieldValue::Varint(42)],
712 )
713 .unwrap();
714 let data = enc.finish();
715
716 let mut dec = Decoder::new(&data).unwrap();
717 let frames = dec.decode_all();
718 assert_eq!(frames.len(), 2);
719 if let DecodedFrame::Event { values, .. } = &frames[1] {
720 assert_eq!(*values, vec![FieldValue::Varint(42)]);
721 } else {
722 panic!("expected event");
723 }
724 }
725
726 #[test]
727 fn decode_string_pool_builds_map() {
728 let mut enc = Encoder::new();
729 let id = enc.intern_string("hello").unwrap();
730 let data = enc.finish();
731
732 let mut dec = Decoder::new(&data).unwrap();
733 dec.decode_all();
734 assert_eq!(dec.string_pool().get(id), Some("hello"));
735 }
736
737 #[test]
738 fn decode_multiple_events() {
739 let mut enc = Encoder::new();
740 let schema = enc
741 .register_schema(
742 "Ev",
743 vec![FieldDef {
744 name: "v".into(),
745 field_type: FieldType::Varint,
746 }],
747 )
748 .unwrap();
749 for i in 0..10u64 {
750 enc.write_event(
751 &schema,
752 &[FieldValue::Varint(i * 1000), FieldValue::Varint(i)],
753 )
754 .unwrap();
755 }
756 let data = enc.finish();
757
758 let mut dec = Decoder::new(&data).unwrap();
759 let frames = dec.decode_all();
760 assert_eq!(frames.len(), 11);
761 }
762
763 #[test]
764 fn bad_header_returns_none() {
765 assert!(Decoder::new(&[0x00, 0x00, 0x00, 0x00, 1]).is_none());
766 }
767
768 #[test]
769 fn iterator_yields_all_frames() {
770 let mut enc = Encoder::new();
771 let schema = enc
772 .register_schema(
773 "Ev",
774 vec![FieldDef {
775 name: "v".into(),
776 field_type: FieldType::Varint,
777 }],
778 )
779 .unwrap();
780 for i in 0..3u64 {
781 enc.write_event(
782 &schema,
783 &[FieldValue::Varint(i * 1000), FieldValue::Varint(i)],
784 )
785 .unwrap();
786 }
787 let data = enc.finish();
788
789 let dec = Decoder::new(&data).unwrap();
790 let frames: Vec<_> = dec.collect::<Result<Vec<_>, _>>().unwrap();
791 assert_eq!(frames.len(), 4);
793 assert!(matches!(frames[0], DecodedFrameRef::Schema(_)));
794 assert!(matches!(frames[1], DecodedFrameRef::Event { .. }));
795 }
796
797 #[test]
798 fn iterator_early_termination() {
799 let mut enc = Encoder::new();
800 let schema = enc
801 .register_schema(
802 "Ev",
803 vec![FieldDef {
804 name: "v".into(),
805 field_type: FieldType::Varint,
806 }],
807 )
808 .unwrap();
809 for i in 0..10u64 {
810 enc.write_event(
811 &schema,
812 &[FieldValue::Varint(i * 1000), FieldValue::Varint(i)],
813 )
814 .unwrap();
815 }
816 let data = enc.finish();
817
818 let mut dec = Decoder::new(&data).unwrap();
819 let first_two: Vec<_> = dec.by_ref().take(2).collect::<Result<Vec<_>, _>>().unwrap();
821 assert_eq!(first_two.len(), 2);
822 let next = dec.next();
824 assert!(next.is_some());
825 }
826
827 #[test]
828 fn events_iterator_skips_schema() {
829 let mut enc = Encoder::new();
830 let schema = enc
831 .register_schema(
832 "Ev",
833 vec![FieldDef {
834 name: "v".into(),
835 field_type: FieldType::Varint,
836 }],
837 )
838 .unwrap();
839 enc.write_event(
840 &schema,
841 &[FieldValue::Varint(1_000), FieldValue::Varint(42)],
842 )
843 .unwrap();
844 enc.write_event(
845 &schema,
846 &[FieldValue::Varint(2_000), FieldValue::Varint(99)],
847 )
848 .unwrap();
849 let data = enc.finish();
850
851 let mut dec = Decoder::new(&data).unwrap();
852 let events: Vec<_> = dec.events().collect::<Result<Vec<_>, _>>().unwrap();
853 assert_eq!(events.len(), 2);
855 for ev in &events {
856 assert!(matches!(ev, DecodedFrameRef::Event { .. }));
857 }
858 }
859
860 #[test]
861 fn events_iterator_first_event_only() {
862 let mut enc = Encoder::new();
863 let schema = enc
864 .register_schema(
865 "Ev",
866 vec![FieldDef {
867 name: "v".into(),
868 field_type: FieldType::Varint,
869 }],
870 )
871 .unwrap();
872 for i in 0..5u64 {
873 enc.write_event(
874 &schema,
875 &[FieldValue::Varint(i * 1000), FieldValue::Varint(i)],
876 )
877 .unwrap();
878 }
879 let data = enc.finish();
880
881 let mut dec = Decoder::new(&data).unwrap();
882 let first = dec.events().next().unwrap().unwrap();
884 assert!(matches!(first, DecodedFrameRef::Event { .. }));
885 }
886}