1use crate::codec::{
10 self, Frame, FrameRef, HEADER_SIZE, PoolEntry, PoolEntryRef, SchemaInfo, WireTypeId,
11};
12use crate::schema::{SchemaEntry, SchemaRegistry};
13use crate::types::{FieldType, FieldValueRef, InternedString};
14use std::collections::HashMap;
15use std::fmt;
16
17#[derive(Debug, Clone)]
21pub struct DecodeError {
22 pub pos: usize,
23 pub message: String,
24}
25
26impl fmt::Display for DecodeError {
27 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
28 write!(f, "decode error at byte {}: {}", self.pos, self.message)
29 }
30}
31
32impl std::error::Error for DecodeError {}
33
34#[derive(Debug)]
36pub enum TryForEachError<E> {
37 Decode(DecodeError),
38 User(E),
39}
40
41impl<E: fmt::Display> fmt::Display for TryForEachError<E> {
42 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
43 match self {
44 TryForEachError::Decode(e) => write!(f, "{e}"),
45 TryForEachError::User(e) => write!(f, "{e}"),
46 }
47 }
48}
49
50impl<E: fmt::Display + fmt::Debug> std::error::Error for TryForEachError<E> {}
51
52#[non_exhaustive]
57pub struct RawEvent<'a, 'f> {
58 pub type_id: WireTypeId,
59 pub name: &'f str,
60 pub timestamp_ns: Option<u64>,
61 pub fields: &'f [FieldValueRef<'a>],
62 pub schema: &'f SchemaEntry,
63 pub string_pool: &'f StringPool,
64}
65
66impl<'a, 'f> RawEvent<'a, 'f> {
67 pub fn field_names(&self) -> impl Iterator<Item = &'f str> {
69 self.schema.fields.iter().map(|f| f.name.as_str())
70 }
71}
72
73#[derive(Debug, Default)]
79pub struct StringPool(pub(crate) HashMap<InternedString, String>);
80
81impl StringPool {
82 pub(crate) fn new() -> Self {
83 Self(HashMap::default())
84 }
85
86 pub(crate) fn insert(&mut self, id: InternedString, value: String) {
87 self.0.insert(id, value);
88 }
89
90 pub fn get(&self, id: InternedString) -> Option<&str> {
91 self.0.get(&id).map(|s| s.as_str())
92 }
93
94 pub fn len(&self) -> usize {
95 self.0.len()
96 }
97
98 pub fn is_empty(&self) -> bool {
99 self.0.is_empty()
100 }
101
102 pub fn iter(&self) -> impl Iterator<Item = (InternedString, &str)> {
104 self.0.iter().map(|(&id, v)| (id, v.as_str()))
105 }
106}
107
108#[derive(Debug, Clone, PartialEq)]
110pub enum DecodedFrame {
111 Schema(SchemaEntry),
112 Event {
113 type_id: WireTypeId,
114 timestamp_ns: Option<u64>,
116 values: Vec<crate::types::FieldValue>,
117 },
118 StringPool(Vec<PoolEntry>),
119}
120
121#[derive(Debug, Clone, PartialEq)]
123pub enum DecodedFrameRef<'a> {
124 Schema(SchemaEntry),
125 Event {
126 type_id: WireTypeId,
127 timestamp_ns: Option<u64>,
128 values: Vec<FieldValueRef<'a>>,
129 },
130 StringPool(Vec<PoolEntryRef<'a>>),
131}
132
133struct SchemaCache {
134 entry: SchemaEntry,
135 field_tags: Vec<u8>,
137}
138
139pub struct Decoder<'a> {
144 data: &'a [u8],
145 pos: usize,
146 registry: SchemaRegistry,
147 schema_cache: Vec<Option<SchemaCache>>,
148 string_pool: StringPool,
149 version: u8,
150 timestamp_base_ns: u64,
151}
152
153impl<'a> Decoder<'a> {
154 pub fn new(data: &'a [u8]) -> Option<Self> {
155 let version = codec::decode_header(data)?;
156 Some(Self {
157 data,
158 pos: HEADER_SIZE,
159 registry: SchemaRegistry::new(),
160 schema_cache: Vec::new(),
161 string_pool: StringPool::new(),
162 version,
163 timestamp_base_ns: 0,
164 })
165 }
166
167 pub fn registry(&self) -> &SchemaRegistry {
168 &self.registry
169 }
170
171 pub fn version(&self) -> u8 {
172 self.version
173 }
174
175 pub fn string_pool(&self) -> &StringPool {
176 &self.string_pool
177 }
178
179 fn reset_state(&mut self) {
183 self.registry = SchemaRegistry::new();
184 self.schema_cache.clear();
185 self.string_pool = StringPool::new();
186 self.timestamp_base_ns = 0;
187 }
188
189 fn try_consume_reset_header(&mut self) -> bool {
192 if self.pos + HEADER_SIZE <= self.data.len()
193 && codec::decode_header(&self.data[self.pos..]).is_some()
194 {
195 self.reset_state();
196 self.pos += HEADER_SIZE;
197 true
198 } else {
199 false
200 }
201 }
202
203 pub fn into_encoder<W: std::io::Write>(self, writer: W) -> crate::encoder::Encoder<W> {
210 crate::encoder::Encoder::from_decoder(
211 self.registry,
212 self.string_pool,
213 self.timestamp_base_ns,
214 writer,
215 )
216 }
217
218 pub(crate) fn schema_info(&self, type_id: WireTypeId) -> Option<SchemaInfo<'_>> {
219 self.schema_cache
220 .get(type_id.0 as usize)
221 .and_then(|s| s.as_ref())
222 .map(|c| SchemaInfo {
223 field_tags: &c.field_tags,
224 has_timestamp: c.entry.has_timestamp,
225 })
226 }
227
228 fn register_schema(&mut self, type_id: WireTypeId, entry: SchemaEntry) -> Result<(), String> {
229 let idx = type_id.0 as usize;
230 if idx >= self.schema_cache.len() {
231 self.schema_cache.resize_with(idx + 1, || None);
232 }
233 self.schema_cache[idx] = Some(SchemaCache {
234 field_tags: entry.fields.iter().map(|f| f.field_type as u8).collect(),
235 entry: entry.clone(),
236 });
237 self.registry.register(type_id, entry)
238 }
239
240 pub fn next_frame(&mut self) -> Result<Option<DecodedFrame>, DecodeError> {
244 if self.pos >= self.data.len() {
245 return Ok(None);
246 }
247 if self.try_consume_reset_header() {
248 return self.next_frame();
249 }
250 let remaining = &self.data[self.pos..];
251 let base = self.timestamp_base_ns;
252 let (frame, consumed) =
253 match codec::decode_frame(remaining, |type_id| self.schema_info(type_id), base) {
254 Some(r) => r,
255 None => return Ok(None),
256 };
257 self.pos += consumed;
258 match frame {
259 Frame::Schema { type_id, entry } => {
260 let result = DecodedFrame::Schema(entry.clone());
261 self.register_schema(type_id, entry)
262 .map_err(|msg| DecodeError {
263 pos: self.pos,
264 message: msg,
265 })?;
266 Ok(Some(result))
267 }
268 Frame::Event {
269 type_id,
270 timestamp_ns,
271 values,
272 } => {
273 if let Some(ts) = timestamp_ns {
274 self.timestamp_base_ns = ts;
275 }
276 Ok(Some(DecodedFrame::Event {
277 type_id,
278 timestamp_ns,
279 values,
280 }))
281 }
282 Frame::StringPool(entries) => {
283 for e in &entries {
284 if let Ok(s) = String::from_utf8(e.data.clone()) {
285 self.string_pool.insert(InternedString(e.pool_id), s);
286 }
287 }
288 Ok(Some(DecodedFrame::StringPool(entries)))
289 }
290 Frame::TimestampReset(ts) => {
291 self.timestamp_base_ns = ts;
292 self.next_frame() }
294 }
295 }
296
297 pub fn decode_all(&mut self) -> Vec<DecodedFrame> {
299 let mut frames = Vec::new();
300 while let Ok(Some(f)) = self.next_frame() {
301 frames.push(f);
302 }
303 frames
304 }
305
306 pub fn next_frame_ref(&mut self) -> Result<Option<DecodedFrameRef<'a>>, DecodeError> {
309 if self.pos >= self.data.len() {
310 return Ok(None);
311 }
312 if self.try_consume_reset_header() {
313 return self.next_frame_ref();
314 }
315 let remaining = &self.data[self.pos..];
316 let base = self.timestamp_base_ns;
317 let (frame, consumed) =
318 match codec::decode_frame_ref(remaining, |type_id| self.schema_info(type_id), base) {
319 Some(r) => r,
320 None => return Ok(None),
321 };
322 self.pos += consumed;
323 match frame {
324 FrameRef::Schema { type_id, entry } => {
325 let result = DecodedFrameRef::Schema(entry.clone());
326 self.register_schema(type_id, entry)
327 .map_err(|msg| DecodeError {
328 pos: self.pos,
329 message: msg,
330 })?;
331 Ok(Some(result))
332 }
333 FrameRef::Event {
334 type_id,
335 timestamp_ns,
336 values,
337 } => {
338 if let Some(ts) = timestamp_ns {
339 self.timestamp_base_ns = ts;
340 }
341 Ok(Some(DecodedFrameRef::Event {
342 type_id,
343 timestamp_ns,
344 values,
345 }))
346 }
347 FrameRef::StringPool(entries) => {
348 for e in &entries {
349 if let Ok(s) = std::str::from_utf8(e.data) {
350 self.string_pool
351 .insert(InternedString(e.pool_id), s.to_string());
352 }
353 }
354 Ok(Some(DecodedFrameRef::StringPool(entries)))
355 }
356 FrameRef::TimestampReset(ts) => {
357 self.timestamp_base_ns = ts;
358 self.next_frame_ref()
359 }
360 }
361 }
362
363 pub fn decode_all_ref(&mut self) -> Vec<DecodedFrameRef<'a>> {
365 let mut frames = Vec::new();
366 while let Ok(Some(f)) = self.next_frame_ref() {
367 frames.push(f);
368 }
369 frames
370 }
371
372 pub fn for_each_event(
381 &mut self,
382 mut f: impl for<'f> FnMut(RawEvent<'a, 'f>),
383 ) -> Result<(), DecodeError> {
384 self.try_for_each_event(|ev| {
385 f(ev);
386 Ok::<(), std::convert::Infallible>(())
387 })
388 .map_err(|e| match e {
389 TryForEachError::Decode(d) => d,
390 TryForEachError::User(inf) => match inf {},
391 })
392 }
393
394 pub fn try_for_each_event<E>(
397 &mut self,
398 mut f: impl for<'f> FnMut(RawEvent<'a, 'f>) -> Result<(), E>,
399 ) -> Result<(), TryForEachError<E>> {
400 let mut values_buf: Vec<FieldValueRef<'a>> = Vec::new();
401 while self.pos < self.data.len() {
402 let remaining = &self.data[self.pos..];
403 let tag = match remaining.first() {
404 Some(t) => *t,
405 None => break,
406 };
407 match tag {
408 codec::TAG_EVENT => {
409 let mut pos = 1;
410 let type_id = match remaining.get(pos..pos + 2) {
411 Some(b) => {
412 pos += 2;
413 WireTypeId(u16::from_le_bytes(b.try_into().unwrap()))
414 }
415 None => {
416 return Err(TryForEachError::Decode(DecodeError {
417 pos: self.pos,
418 message: "truncated event frame".into(),
419 }));
420 }
421 };
422 let cache = match self
423 .schema_cache
424 .get(type_id.0 as usize)
425 .and_then(|s| s.as_ref())
426 {
427 Some(c) => c,
428 None => {
429 return Err(TryForEachError::Decode(DecodeError {
430 pos: self.pos,
431 message: format!("unknown type_id {type_id:?}"),
432 }));
433 }
434 };
435
436 let timestamp_ns = if cache.entry.has_timestamp {
437 match codec::decode_u24_le(&remaining[pos..]) {
438 Some(delta) => {
439 pos += 3;
440 Some(self.timestamp_base_ns + delta as u64)
441 }
442 None => {
443 return Err(TryForEachError::Decode(DecodeError {
444 pos: self.pos + pos,
445 message: "truncated timestamp delta".into(),
446 }));
447 }
448 }
449 } else {
450 None
451 };
452
453 values_buf.clear();
454 for &ftag in &cache.field_tags {
455 let inner_type = match FieldType::from_tag(ftag) {
456 Some(ft) => ft,
457 None => {
458 return Err(TryForEachError::Decode(DecodeError {
459 pos: self.pos + pos,
460 message: format!("unknown field type tag {ftag:#x}"),
461 }));
462 }
463 };
464 if inner_type.is_optional() {
465 match remaining.get(pos) {
466 Some(0x00) => {
467 values_buf.push(FieldValueRef::None);
468 pos += 1;
469 }
470 Some(_) => {
471 pos += 1;
472 match FieldValueRef::decode(inner_type.inner(), remaining, pos)
473 {
474 Some((val, consumed)) => {
475 values_buf.push(val);
476 pos += consumed;
477 }
478 None => {
479 return Err(TryForEachError::Decode(DecodeError {
480 pos: self.pos + pos,
481 message: "truncated optional field value".into(),
482 }));
483 }
484 }
485 }
486 None => {
487 return Err(TryForEachError::Decode(DecodeError {
488 pos: self.pos + pos,
489 message: "truncated optional field prefix".into(),
490 }));
491 }
492 }
493 } else {
494 match FieldValueRef::decode(inner_type, remaining, pos) {
495 Some((val, consumed)) => {
496 values_buf.push(val);
497 pos += consumed;
498 }
499 None => {
500 return Err(TryForEachError::Decode(DecodeError {
501 pos: self.pos + pos,
502 message: "truncated field value".into(),
503 }));
504 }
505 }
506 }
507 }
508 {
514 let Self {
515 pos: self_pos,
516 timestamp_base_ns,
517 ..
518 } = self;
519 *self_pos += pos;
520 if let Some(ts) = timestamp_ns {
521 *timestamp_base_ns = ts;
522 }
523 }
524 f(RawEvent {
525 type_id,
526 name: &cache.entry.name,
527 timestamp_ns,
528 fields: &values_buf,
529 schema: &cache.entry,
530 string_pool: &self.string_pool,
531 })
532 .map_err(TryForEachError::User)?;
533 }
534 codec::TAG_TIMESTAMP_RESET => {
535 let ts = match self.data.get(self.pos + 1..self.pos + 9) {
536 Some(b) => u64::from_le_bytes(b.try_into().unwrap()),
537 None => {
538 return Err(TryForEachError::Decode(DecodeError {
539 pos: self.pos,
540 message: "truncated timestamp reset".into(),
541 }));
542 }
543 };
544 self.timestamp_base_ns = ts;
545 self.pos += 9;
546 }
547 _ => {
548 if tag == codec::MAGIC[0] && self.try_consume_reset_header() {
550 continue;
551 }
552 match self.next_frame_ref() {
553 Ok(Some(_)) => {}
554 Ok(None) => {
555 return Err(TryForEachError::Decode(DecodeError {
556 pos: self.pos,
557 message: format!("failed to decode frame with tag 0x{tag:02x}"),
558 }));
559 }
560 Err(e) => return Err(TryForEachError::Decode(e)),
561 }
562 }
563 }
564 }
565 Ok(())
566 }
567
568 pub fn events(&mut self) -> EventIter<'_, 'a> {
572 EventIter { decoder: self }
573 }
574}
575
576impl<'a> Iterator for Decoder<'a> {
577 type Item = Result<DecodedFrameRef<'a>, DecodeError>;
578
579 fn next(&mut self) -> Option<Self::Item> {
580 self.next_frame_ref().transpose()
581 }
582}
583
584pub struct EventIter<'d, 'a> {
587 decoder: &'d mut Decoder<'a>,
588}
589
590impl<'d, 'a> Iterator for EventIter<'d, 'a> {
591 type Item = Result<DecodedFrameRef<'a>, DecodeError>;
592
593 fn next(&mut self) -> Option<Self::Item> {
594 loop {
595 match self.decoder.next()? {
596 Ok(frame @ DecodedFrameRef::Event { .. }) => return Some(Ok(frame)),
597 Ok(_) => continue, Err(e) => return Some(Err(e)),
599 }
600 }
601 }
602}
603
604#[cfg(test)]
605mod tests {
606 use super::*;
607 use crate::encoder::Encoder;
608 use crate::schema::FieldDef;
609 use crate::types::{FieldType, FieldValue};
610
611 #[test]
612 fn decode_empty_stream() {
613 let enc = Encoder::new();
614 let data = enc.finish();
615 let mut dec = Decoder::new(&data).unwrap();
616 assert_eq!(dec.version(), 1);
617 assert!(dec.next_frame().unwrap().is_none());
618 }
619
620 #[test]
621 fn decode_schema_frame() {
622 let mut enc = Encoder::new();
623 enc.register_schema(
624 "Ev",
625 vec![FieldDef {
626 name: "v".into(),
627 field_type: FieldType::Varint,
628 }],
629 )
630 .unwrap();
631 let data = enc.finish();
632 let mut dec = Decoder::new(&data).unwrap();
633 let frame = dec.next_frame().unwrap().unwrap();
634 assert!(matches!(frame, DecodedFrame::Schema(s) if s.name == "Ev"));
635 }
636
637 #[test]
638 fn decode_event_after_schema() {
639 let mut enc = Encoder::new();
640 let schema = enc
641 .register_schema(
642 "Ev",
643 vec![FieldDef {
644 name: "v".into(),
645 field_type: FieldType::Varint,
646 }],
647 )
648 .unwrap();
649 enc.write_event(
650 &schema,
651 &[FieldValue::Varint(1_000), FieldValue::Varint(42)],
652 )
653 .unwrap();
654 let data = enc.finish();
655
656 let mut dec = Decoder::new(&data).unwrap();
657 let frames = dec.decode_all();
658 assert_eq!(frames.len(), 2);
659 if let DecodedFrame::Event { values, .. } = &frames[1] {
660 assert_eq!(*values, vec![FieldValue::Varint(42)]);
661 } else {
662 panic!("expected event");
663 }
664 }
665
666 #[test]
667 fn decode_string_pool_builds_map() {
668 let mut enc = Encoder::new();
669 let id = enc.intern_string("hello").unwrap();
670 let data = enc.finish();
671
672 let mut dec = Decoder::new(&data).unwrap();
673 dec.decode_all();
674 assert_eq!(dec.string_pool().get(id), Some("hello"));
675 }
676
677 #[test]
678 fn decode_multiple_events() {
679 let mut enc = Encoder::new();
680 let schema = enc
681 .register_schema(
682 "Ev",
683 vec![FieldDef {
684 name: "v".into(),
685 field_type: FieldType::Varint,
686 }],
687 )
688 .unwrap();
689 for i in 0..10u64 {
690 enc.write_event(
691 &schema,
692 &[FieldValue::Varint(i * 1000), FieldValue::Varint(i)],
693 )
694 .unwrap();
695 }
696 let data = enc.finish();
697
698 let mut dec = Decoder::new(&data).unwrap();
699 let frames = dec.decode_all();
700 assert_eq!(frames.len(), 11);
701 }
702
703 #[test]
704 fn bad_header_returns_none() {
705 assert!(Decoder::new(&[0x00, 0x00, 0x00, 0x00, 1]).is_none());
706 }
707
708 #[test]
709 fn iterator_yields_all_frames() {
710 let mut enc = Encoder::new();
711 let schema = enc
712 .register_schema(
713 "Ev",
714 vec![FieldDef {
715 name: "v".into(),
716 field_type: FieldType::Varint,
717 }],
718 )
719 .unwrap();
720 for i in 0..3u64 {
721 enc.write_event(
722 &schema,
723 &[FieldValue::Varint(i * 1000), FieldValue::Varint(i)],
724 )
725 .unwrap();
726 }
727 let data = enc.finish();
728
729 let dec = Decoder::new(&data).unwrap();
730 let frames: Vec<_> = dec.collect::<Result<Vec<_>, _>>().unwrap();
731 assert_eq!(frames.len(), 4);
733 assert!(matches!(frames[0], DecodedFrameRef::Schema(_)));
734 assert!(matches!(frames[1], DecodedFrameRef::Event { .. }));
735 }
736
737 #[test]
738 fn iterator_early_termination() {
739 let mut enc = Encoder::new();
740 let schema = enc
741 .register_schema(
742 "Ev",
743 vec![FieldDef {
744 name: "v".into(),
745 field_type: FieldType::Varint,
746 }],
747 )
748 .unwrap();
749 for i in 0..10u64 {
750 enc.write_event(
751 &schema,
752 &[FieldValue::Varint(i * 1000), FieldValue::Varint(i)],
753 )
754 .unwrap();
755 }
756 let data = enc.finish();
757
758 let mut dec = Decoder::new(&data).unwrap();
759 let first_two: Vec<_> = dec.by_ref().take(2).collect::<Result<Vec<_>, _>>().unwrap();
761 assert_eq!(first_two.len(), 2);
762 let next = dec.next();
764 assert!(next.is_some());
765 }
766
767 #[test]
768 fn events_iterator_skips_schema() {
769 let mut enc = Encoder::new();
770 let schema = enc
771 .register_schema(
772 "Ev",
773 vec![FieldDef {
774 name: "v".into(),
775 field_type: FieldType::Varint,
776 }],
777 )
778 .unwrap();
779 enc.write_event(
780 &schema,
781 &[FieldValue::Varint(1_000), FieldValue::Varint(42)],
782 )
783 .unwrap();
784 enc.write_event(
785 &schema,
786 &[FieldValue::Varint(2_000), FieldValue::Varint(99)],
787 )
788 .unwrap();
789 let data = enc.finish();
790
791 let mut dec = Decoder::new(&data).unwrap();
792 let events: Vec<_> = dec.events().collect::<Result<Vec<_>, _>>().unwrap();
793 assert_eq!(events.len(), 2);
795 for ev in &events {
796 assert!(matches!(ev, DecodedFrameRef::Event { .. }));
797 }
798 }
799
800 #[test]
801 fn events_iterator_first_event_only() {
802 let mut enc = Encoder::new();
803 let schema = enc
804 .register_schema(
805 "Ev",
806 vec![FieldDef {
807 name: "v".into(),
808 field_type: FieldType::Varint,
809 }],
810 )
811 .unwrap();
812 for i in 0..5u64 {
813 enc.write_event(
814 &schema,
815 &[FieldValue::Varint(i * 1000), FieldValue::Varint(i)],
816 )
817 .unwrap();
818 }
819 let data = enc.finish();
820
821 let mut dec = Decoder::new(&data).unwrap();
822 let first = dec.events().next().unwrap().unwrap();
824 assert!(matches!(first, DecodedFrameRef::Event { .. }));
825 }
826}