1use crate::codec::{
10 self, Frame, FrameRef, HEADER_SIZE, PoolEntry, PoolEntryRef, SchemaInfo, StackPoolEntry,
11 StackPoolEntryRef, WireTypeId,
12};
13use crate::schema::{SchemaEntry, SchemaRegistry};
14use crate::types::{FieldType, FieldValueRef, InternedStackFrames, InternedString, StackFrames};
15use std::collections::HashMap;
16use std::fmt;
17
18#[derive(Debug, Clone)]
22pub struct DecodeError {
23 pub pos: usize,
24 pub message: String,
25}
26
27impl fmt::Display for DecodeError {
28 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
29 write!(f, "decode error at byte {}: {}", self.pos, self.message)
30 }
31}
32
33impl std::error::Error for DecodeError {}
34
35#[derive(Debug)]
37pub enum TryForEachError<E> {
38 Decode(DecodeError),
39 User(E),
40}
41
42impl<E: fmt::Display> fmt::Display for TryForEachError<E> {
43 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
44 match self {
45 TryForEachError::Decode(e) => write!(f, "{e}"),
46 TryForEachError::User(e) => write!(f, "{e}"),
47 }
48 }
49}
50
51impl<E: fmt::Display + fmt::Debug> std::error::Error for TryForEachError<E> {}
52
53#[non_exhaustive]
58pub struct RawEvent<'a, 'f> {
59 pub type_id: WireTypeId,
60 pub name: &'f str,
61 pub timestamp_ns: Option<u64>,
62 pub fields: &'f [FieldValueRef<'a>],
63 pub schema: &'f SchemaEntry,
64 pub string_pool: &'f StringPool,
65 pub stack_pool: &'f StackPool,
66}
67
68impl<'a, 'f> RawEvent<'a, 'f> {
69 pub fn field_names(&self) -> impl Iterator<Item = &'f str> {
71 self.schema.fields.iter().map(|f| f.name.as_str())
72 }
73}
74
75#[derive(Debug, Default)]
81pub struct StringPool(pub(crate) HashMap<InternedString, String>);
82
83impl StringPool {
84 pub(crate) fn new() -> Self {
85 Self(HashMap::default())
86 }
87
88 pub(crate) fn insert(&mut self, id: InternedString, value: String) {
89 self.0.insert(id, value);
90 }
91
92 pub fn get(&self, id: InternedString) -> Option<&str> {
93 self.0.get(&id).map(|s| s.as_str())
94 }
95
96 pub fn len(&self) -> usize {
97 self.0.len()
98 }
99
100 pub fn is_empty(&self) -> bool {
101 self.0.is_empty()
102 }
103
104 pub fn iter(&self) -> impl Iterator<Item = (InternedString, &str)> {
106 self.0.iter().map(|(&id, v)| (id, v.as_str()))
107 }
108}
109
110#[derive(Debug, Default)]
114pub struct StackPool(pub(crate) HashMap<InternedStackFrames, Vec<u64>>);
115
116impl StackPool {
117 pub(crate) fn new() -> Self {
118 Self(HashMap::default())
119 }
120
121 pub(crate) fn insert(&mut self, id: InternedStackFrames, frames: StackFrames) {
122 self.0.insert(id, frames.0);
123 }
124
125 pub fn get(&self, id: InternedStackFrames) -> Option<&[u64]> {
126 self.0.get(&id).map(|v| v.as_slice())
127 }
128
129 pub fn len(&self) -> usize {
130 self.0.len()
131 }
132
133 pub fn is_empty(&self) -> bool {
134 self.0.is_empty()
135 }
136
137 pub fn iter(&self) -> impl Iterator<Item = (InternedStackFrames, &[u64])> {
139 self.0.iter().map(|(&id, v)| (id, v.as_slice()))
140 }
141}
142
143#[derive(Debug, Clone, PartialEq)]
145pub enum DecodedFrame {
146 Schema(SchemaEntry),
147 Event {
148 type_id: WireTypeId,
149 timestamp_ns: Option<u64>,
151 values: Vec<crate::types::FieldValue>,
152 },
153 StringPool(Vec<PoolEntry>),
154 StackPool(Vec<StackPoolEntry>),
155 SchemaAnnotations {
156 type_id: WireTypeId,
157 annotations: Vec<crate::schema::FieldAnnotation>,
158 },
159}
160
161#[derive(Debug, Clone, PartialEq)]
163pub enum DecodedFrameRef<'a> {
164 Schema(SchemaEntry),
165 Event {
166 type_id: WireTypeId,
167 timestamp_ns: Option<u64>,
168 values: Vec<FieldValueRef<'a>>,
169 },
170 StringPool(Vec<PoolEntryRef<'a>>),
171 StackPool(Vec<StackPoolEntryRef<'a>>),
172 SchemaAnnotations {
173 type_id: WireTypeId,
174 annotations: Vec<crate::schema::FieldAnnotation>,
175 },
176}
177
178struct SchemaCache {
179 entry: SchemaEntry,
180 field_tags: Vec<u8>,
182}
183
184pub struct Decoder<'a> {
189 data: &'a [u8],
190 pos: usize,
191 registry: SchemaRegistry,
192 schema_cache: Vec<Option<SchemaCache>>,
193 string_pool: StringPool,
194 stack_pool: StackPool,
195 version: u8,
196 timestamp_base_ns: u64,
197}
198
199impl<'a> Decoder<'a> {
200 pub fn new(data: &'a [u8]) -> Option<Self> {
201 let version = codec::decode_header(data)?;
202 Some(Self {
203 data,
204 pos: HEADER_SIZE,
205 registry: SchemaRegistry::new(),
206 schema_cache: Vec::new(),
207 string_pool: StringPool::new(),
208 stack_pool: StackPool::new(),
209 version,
210 timestamp_base_ns: 0,
211 })
212 }
213
214 pub fn registry(&self) -> &SchemaRegistry {
215 &self.registry
216 }
217
218 pub fn version(&self) -> u8 {
219 self.version
220 }
221
222 pub fn string_pool(&self) -> &StringPool {
223 &self.string_pool
224 }
225
226 pub fn stack_pool(&self) -> &StackPool {
227 &self.stack_pool
228 }
229
230 fn reset_state(&mut self) {
234 self.registry = SchemaRegistry::new();
235 self.schema_cache.clear();
236 self.string_pool = StringPool::new();
237 self.stack_pool = StackPool::new();
238 self.timestamp_base_ns = 0;
239 }
240
241 fn try_consume_reset_header(&mut self) -> bool {
244 if self.pos + HEADER_SIZE <= self.data.len()
245 && codec::decode_header(&self.data[self.pos..]).is_some()
246 {
247 self.reset_state();
248 self.pos += HEADER_SIZE;
249 true
250 } else {
251 false
252 }
253 }
254
255 pub fn into_encoder<W: std::io::Write>(self, writer: W) -> crate::encoder::Encoder<W> {
262 crate::encoder::Encoder::from_decoder(
263 self.registry,
264 self.string_pool,
265 self.stack_pool,
266 self.timestamp_base_ns,
267 writer,
268 )
269 }
270
271 pub(crate) fn schema_info(&self, type_id: WireTypeId) -> Option<SchemaInfo<'_>> {
272 self.schema_cache
273 .get(type_id.0 as usize)
274 .and_then(|s| s.as_ref())
275 .map(|c| SchemaInfo {
276 field_tags: &c.field_tags,
277 has_timestamp: c.entry.has_timestamp,
278 })
279 }
280
281 fn register_schema(&mut self, type_id: WireTypeId, entry: SchemaEntry) -> Result<(), String> {
282 let idx = type_id.0 as usize;
283 if idx >= self.schema_cache.len() {
284 self.schema_cache.resize_with(idx + 1, || None);
285 }
286 self.schema_cache[idx] = Some(SchemaCache {
287 field_tags: entry.fields.iter().map(|f| f.field_type as u8).collect(),
288 entry: entry.clone(),
289 });
290 self.registry.register(type_id, entry)
291 }
292
293 pub fn next_frame(&mut self) -> Result<Option<DecodedFrame>, DecodeError> {
297 if self.pos >= self.data.len() {
298 return Ok(None);
299 }
300 if self.try_consume_reset_header() {
301 return self.next_frame();
302 }
303 let remaining = &self.data[self.pos..];
304 let base = self.timestamp_base_ns;
305 let (frame, consumed) =
306 match codec::decode_frame(remaining, |type_id| self.schema_info(type_id), base) {
307 Some(r) => r,
308 None => return Ok(None),
309 };
310 self.pos += consumed;
311 match frame {
312 Frame::Schema { type_id, entry } => {
313 let result = DecodedFrame::Schema(entry.clone());
314 self.register_schema(type_id, entry)
315 .map_err(|msg| DecodeError {
316 pos: self.pos,
317 message: msg,
318 })?;
319 Ok(Some(result))
320 }
321 Frame::Event {
322 type_id,
323 timestamp_ns,
324 values,
325 } => {
326 if let Some(ts) = timestamp_ns {
327 self.timestamp_base_ns = ts;
328 }
329 Ok(Some(DecodedFrame::Event {
330 type_id,
331 timestamp_ns,
332 values,
333 }))
334 }
335 Frame::StringPool(entries) => {
336 for e in &entries {
337 if let Ok(s) = String::from_utf8(e.data.clone()) {
338 self.string_pool.insert(InternedString(e.pool_id), s);
339 }
340 }
341 Ok(Some(DecodedFrame::StringPool(entries)))
342 }
343 Frame::StackPool(entries) => {
344 for e in &entries {
345 self.stack_pool
346 .insert(InternedStackFrames(e.pool_id), e.frames.clone().into());
347 }
348 Ok(Some(DecodedFrame::StackPool(entries)))
349 }
350 Frame::TimestampReset(ts) => {
351 self.timestamp_base_ns = ts;
352 self.next_frame() }
354 Frame::SchemaAnnotations {
355 type_id,
356 annotations,
357 } => {
358 if let Some(cache) = self
360 .schema_cache
361 .get_mut(type_id.0 as usize)
362 .and_then(|s| s.as_mut())
363 {
364 cache.entry.annotations.extend_from_slice(&annotations);
365 }
366 if let Some(entry) = self.registry.schemas.get_mut(&type_id) {
367 entry.annotations.extend_from_slice(&annotations);
368 }
369 Ok(Some(DecodedFrame::SchemaAnnotations {
370 type_id,
371 annotations,
372 }))
373 }
374 }
375 }
376
377 pub fn decode_all(&mut self) -> Vec<DecodedFrame> {
379 let mut frames = Vec::new();
380 while let Ok(Some(f)) = self.next_frame() {
381 frames.push(f);
382 }
383 frames
384 }
385
386 pub fn next_frame_ref(&mut self) -> Result<Option<DecodedFrameRef<'a>>, DecodeError> {
389 if self.pos >= self.data.len() {
390 return Ok(None);
391 }
392 if self.try_consume_reset_header() {
393 return self.next_frame_ref();
394 }
395 let remaining = &self.data[self.pos..];
396 let base = self.timestamp_base_ns;
397 let (frame, consumed) =
398 match codec::decode_frame_ref(remaining, |type_id| self.schema_info(type_id), base) {
399 Some(r) => r,
400 None => return Ok(None),
401 };
402 self.pos += consumed;
403 match frame {
404 FrameRef::Schema { type_id, entry } => {
405 let result = DecodedFrameRef::Schema(entry.clone());
406 self.register_schema(type_id, entry)
407 .map_err(|msg| DecodeError {
408 pos: self.pos,
409 message: msg,
410 })?;
411 Ok(Some(result))
412 }
413 FrameRef::Event {
414 type_id,
415 timestamp_ns,
416 values,
417 } => {
418 if let Some(ts) = timestamp_ns {
419 self.timestamp_base_ns = ts;
420 }
421 Ok(Some(DecodedFrameRef::Event {
422 type_id,
423 timestamp_ns,
424 values,
425 }))
426 }
427 FrameRef::StringPool(entries) => {
428 for e in &entries {
429 if let Ok(s) = std::str::from_utf8(e.data) {
430 self.string_pool
431 .insert(InternedString(e.pool_id), s.to_string());
432 }
433 }
434 Ok(Some(DecodedFrameRef::StringPool(entries)))
435 }
436 FrameRef::StackPool(entries) => {
437 for e in &entries {
438 self.stack_pool
439 .insert(InternedStackFrames(e.pool_id), e.to_stack_frames());
440 }
441 Ok(Some(DecodedFrameRef::StackPool(entries)))
442 }
443 FrameRef::TimestampReset(ts) => {
444 self.timestamp_base_ns = ts;
445 self.next_frame_ref()
446 }
447 FrameRef::SchemaAnnotations {
448 type_id,
449 annotations,
450 } => {
451 if let Some(cache) = self
452 .schema_cache
453 .get_mut(type_id.0 as usize)
454 .and_then(|s| s.as_mut())
455 {
456 cache.entry.annotations.extend_from_slice(&annotations);
457 }
458 if let Some(entry) = self.registry.schemas.get_mut(&type_id) {
459 entry.annotations.extend_from_slice(&annotations);
460 }
461 Ok(Some(DecodedFrameRef::SchemaAnnotations {
462 type_id,
463 annotations,
464 }))
465 }
466 }
467 }
468
469 pub fn decode_all_ref(&mut self) -> Vec<DecodedFrameRef<'a>> {
471 let mut frames = Vec::new();
472 while let Ok(Some(f)) = self.next_frame_ref() {
473 frames.push(f);
474 }
475 frames
476 }
477
478 pub fn for_each_event(
487 &mut self,
488 mut f: impl for<'f> FnMut(RawEvent<'a, 'f>),
489 ) -> Result<(), DecodeError> {
490 self.try_for_each_event(|ev| {
491 f(ev);
492 Ok::<(), std::convert::Infallible>(())
493 })
494 .map_err(|e| match e {
495 TryForEachError::Decode(d) => d,
496 TryForEachError::User(inf) => match inf {},
497 })
498 }
499
500 pub fn try_for_each_event<E>(
503 &mut self,
504 mut f: impl for<'f> FnMut(RawEvent<'a, 'f>) -> Result<(), E>,
505 ) -> Result<(), TryForEachError<E>> {
506 let mut values_buf: Vec<FieldValueRef<'a>> = Vec::new();
507 while self.pos < self.data.len() {
508 let remaining = &self.data[self.pos..];
509 let tag = match remaining.first() {
510 Some(t) => *t,
511 None => break,
512 };
513 match tag {
514 codec::TAG_EVENT => {
515 let mut pos = 1;
516 let type_id = match remaining.get(pos..pos + 2) {
517 Some(b) => {
518 pos += 2;
519 WireTypeId(u16::from_le_bytes(b.try_into().unwrap()))
520 }
521 None => {
522 return Err(TryForEachError::Decode(DecodeError {
523 pos: self.pos,
524 message: "truncated event frame".into(),
525 }));
526 }
527 };
528 let cache = match self
529 .schema_cache
530 .get(type_id.0 as usize)
531 .and_then(|s| s.as_ref())
532 {
533 Some(c) => c,
534 None => {
535 return Err(TryForEachError::Decode(DecodeError {
536 pos: self.pos,
537 message: format!("unknown type_id {type_id:?}"),
538 }));
539 }
540 };
541
542 let timestamp_ns = if cache.entry.has_timestamp {
543 match codec::decode_u24_le(&remaining[pos..]) {
544 Some(delta) => {
545 pos += 3;
546 Some(self.timestamp_base_ns + delta as u64)
547 }
548 None => {
549 return Err(TryForEachError::Decode(DecodeError {
550 pos: self.pos + pos,
551 message: "truncated timestamp delta".into(),
552 }));
553 }
554 }
555 } else {
556 None
557 };
558
559 values_buf.clear();
560 for &ftag in &cache.field_tags {
561 let inner_type = match FieldType::from_tag(ftag) {
562 Some(ft) => ft,
563 None => {
564 return Err(TryForEachError::Decode(DecodeError {
565 pos: self.pos + pos,
566 message: format!("unknown field type tag {ftag:#x}"),
567 }));
568 }
569 };
570 if inner_type.is_optional() {
571 match remaining.get(pos) {
572 Some(0x00) => {
573 values_buf.push(FieldValueRef::None);
574 pos += 1;
575 }
576 Some(_) => {
577 pos += 1;
578 match FieldValueRef::decode(inner_type.inner(), remaining, pos)
579 {
580 Some((val, consumed)) => {
581 values_buf.push(val);
582 pos += consumed;
583 }
584 None => {
585 return Err(TryForEachError::Decode(DecodeError {
586 pos: self.pos + pos,
587 message: "truncated optional field value".into(),
588 }));
589 }
590 }
591 }
592 None => {
593 return Err(TryForEachError::Decode(DecodeError {
594 pos: self.pos + pos,
595 message: "truncated optional field prefix".into(),
596 }));
597 }
598 }
599 } else {
600 match FieldValueRef::decode(inner_type, remaining, pos) {
601 Some((val, consumed)) => {
602 values_buf.push(val);
603 pos += consumed;
604 }
605 None => {
606 return Err(TryForEachError::Decode(DecodeError {
607 pos: self.pos + pos,
608 message: "truncated field value".into(),
609 }));
610 }
611 }
612 }
613 }
614 {
620 let Self {
621 pos: self_pos,
622 timestamp_base_ns,
623 ..
624 } = self;
625 *self_pos += pos;
626 if let Some(ts) = timestamp_ns {
627 *timestamp_base_ns = ts;
628 }
629 }
630 f(RawEvent {
631 type_id,
632 name: &cache.entry.name,
633 timestamp_ns,
634 fields: &values_buf,
635 schema: &cache.entry,
636 string_pool: &self.string_pool,
637 stack_pool: &self.stack_pool,
638 })
639 .map_err(TryForEachError::User)?;
640 }
641 codec::TAG_TIMESTAMP_RESET => {
642 let ts = match self.data.get(self.pos + 1..self.pos + 9) {
643 Some(b) => u64::from_le_bytes(b.try_into().unwrap()),
644 None => {
645 return Err(TryForEachError::Decode(DecodeError {
646 pos: self.pos,
647 message: "truncated timestamp reset".into(),
648 }));
649 }
650 };
651 self.timestamp_base_ns = ts;
652 self.pos += 9;
653 }
654 _ => {
655 if tag == codec::MAGIC[0] && self.try_consume_reset_header() {
657 continue;
658 }
659 match self.next_frame_ref() {
660 Ok(Some(_)) => {}
661 Ok(None) => {
662 return Err(TryForEachError::Decode(DecodeError {
663 pos: self.pos,
664 message: format!("failed to decode frame with tag 0x{tag:02x}"),
665 }));
666 }
667 Err(e) => return Err(TryForEachError::Decode(e)),
668 }
669 }
670 }
671 }
672 Ok(())
673 }
674
675 pub fn events(&mut self) -> EventIter<'_, 'a> {
679 EventIter { decoder: self }
680 }
681}
682
683impl<'a> Iterator for Decoder<'a> {
684 type Item = Result<DecodedFrameRef<'a>, DecodeError>;
685
686 fn next(&mut self) -> Option<Self::Item> {
687 self.next_frame_ref().transpose()
688 }
689}
690
691pub struct EventIter<'d, 'a> {
694 decoder: &'d mut Decoder<'a>,
695}
696
697impl<'d, 'a> Iterator for EventIter<'d, 'a> {
698 type Item = Result<DecodedFrameRef<'a>, DecodeError>;
699
700 fn next(&mut self) -> Option<Self::Item> {
701 loop {
702 match self.decoder.next()? {
703 Ok(frame @ DecodedFrameRef::Event { .. }) => return Some(Ok(frame)),
704 Ok(_) => continue, Err(e) => return Some(Err(e)),
706 }
707 }
708 }
709}
710
711#[cfg(test)]
712mod tests {
713 use super::*;
714 use crate::encoder::Encoder;
715 use crate::schema::FieldDef;
716 use crate::types::{FieldType, FieldValue};
717
718 #[test]
719 fn decode_empty_stream() {
720 let enc = Encoder::new();
721 let data = enc.finish();
722 let mut dec = Decoder::new(&data).unwrap();
723 assert_eq!(dec.version(), 1);
724 assert!(dec.next_frame().unwrap().is_none());
725 }
726
727 #[test]
728 fn decode_schema_frame() {
729 let mut enc = Encoder::new();
730 enc.register_schema(
731 "Ev",
732 vec![FieldDef {
733 name: "v".into(),
734 field_type: FieldType::Varint,
735 }],
736 )
737 .unwrap();
738 let data = enc.finish();
739 let mut dec = Decoder::new(&data).unwrap();
740 let frame = dec.next_frame().unwrap().unwrap();
741 assert!(matches!(frame, DecodedFrame::Schema(s) if s.name == "Ev"));
742 }
743
744 #[test]
745 fn decode_event_after_schema() {
746 let mut enc = Encoder::new();
747 let schema = enc
748 .register_schema(
749 "Ev",
750 vec![FieldDef {
751 name: "v".into(),
752 field_type: FieldType::Varint,
753 }],
754 )
755 .unwrap();
756 enc.write_event(
757 &schema,
758 &[FieldValue::Varint(1_000), FieldValue::Varint(42)],
759 )
760 .unwrap();
761 let data = enc.finish();
762
763 let mut dec = Decoder::new(&data).unwrap();
764 let frames = dec.decode_all();
765 assert_eq!(frames.len(), 2);
766 if let DecodedFrame::Event { values, .. } = &frames[1] {
767 assert_eq!(*values, vec![FieldValue::Varint(42)]);
768 } else {
769 panic!("expected event");
770 }
771 }
772
773 #[test]
774 fn decode_string_pool_builds_map() {
775 let mut enc = Encoder::new();
776 let id = enc.intern_string("hello").unwrap();
777 let data = enc.finish();
778
779 let mut dec = Decoder::new(&data).unwrap();
780 dec.decode_all();
781 assert_eq!(dec.string_pool().get(id), Some("hello"));
782 }
783
784 #[test]
785 fn decode_multiple_events() {
786 let mut enc = Encoder::new();
787 let schema = enc
788 .register_schema(
789 "Ev",
790 vec![FieldDef {
791 name: "v".into(),
792 field_type: FieldType::Varint,
793 }],
794 )
795 .unwrap();
796 for i in 0..10u64 {
797 enc.write_event(
798 &schema,
799 &[FieldValue::Varint(i * 1000), FieldValue::Varint(i)],
800 )
801 .unwrap();
802 }
803 let data = enc.finish();
804
805 let mut dec = Decoder::new(&data).unwrap();
806 let frames = dec.decode_all();
807 assert_eq!(frames.len(), 11);
808 }
809
810 #[test]
811 fn bad_header_returns_none() {
812 assert!(Decoder::new(&[0x00, 0x00, 0x00, 0x00, 1]).is_none());
813 }
814
815 #[test]
816 fn iterator_yields_all_frames() {
817 let mut enc = Encoder::new();
818 let schema = enc
819 .register_schema(
820 "Ev",
821 vec![FieldDef {
822 name: "v".into(),
823 field_type: FieldType::Varint,
824 }],
825 )
826 .unwrap();
827 for i in 0..3u64 {
828 enc.write_event(
829 &schema,
830 &[FieldValue::Varint(i * 1000), FieldValue::Varint(i)],
831 )
832 .unwrap();
833 }
834 let data = enc.finish();
835
836 let dec = Decoder::new(&data).unwrap();
837 let frames: Vec<_> = dec.collect::<Result<Vec<_>, _>>().unwrap();
838 assert_eq!(frames.len(), 4);
840 assert!(matches!(frames[0], DecodedFrameRef::Schema(_)));
841 assert!(matches!(frames[1], DecodedFrameRef::Event { .. }));
842 }
843
844 #[test]
845 fn iterator_early_termination() {
846 let mut enc = Encoder::new();
847 let schema = enc
848 .register_schema(
849 "Ev",
850 vec![FieldDef {
851 name: "v".into(),
852 field_type: FieldType::Varint,
853 }],
854 )
855 .unwrap();
856 for i in 0..10u64 {
857 enc.write_event(
858 &schema,
859 &[FieldValue::Varint(i * 1000), FieldValue::Varint(i)],
860 )
861 .unwrap();
862 }
863 let data = enc.finish();
864
865 let mut dec = Decoder::new(&data).unwrap();
866 let first_two: Vec<_> = dec.by_ref().take(2).collect::<Result<Vec<_>, _>>().unwrap();
868 assert_eq!(first_two.len(), 2);
869 let next = dec.next();
871 assert!(next.is_some());
872 }
873
874 #[test]
875 fn events_iterator_skips_schema() {
876 let mut enc = Encoder::new();
877 let schema = enc
878 .register_schema(
879 "Ev",
880 vec![FieldDef {
881 name: "v".into(),
882 field_type: FieldType::Varint,
883 }],
884 )
885 .unwrap();
886 enc.write_event(
887 &schema,
888 &[FieldValue::Varint(1_000), FieldValue::Varint(42)],
889 )
890 .unwrap();
891 enc.write_event(
892 &schema,
893 &[FieldValue::Varint(2_000), FieldValue::Varint(99)],
894 )
895 .unwrap();
896 let data = enc.finish();
897
898 let mut dec = Decoder::new(&data).unwrap();
899 let events: Vec<_> = dec.events().collect::<Result<Vec<_>, _>>().unwrap();
900 assert_eq!(events.len(), 2);
902 for ev in &events {
903 assert!(matches!(ev, DecodedFrameRef::Event { .. }));
904 }
905 }
906
907 #[test]
908 fn events_iterator_first_event_only() {
909 let mut enc = Encoder::new();
910 let schema = enc
911 .register_schema(
912 "Ev",
913 vec![FieldDef {
914 name: "v".into(),
915 field_type: FieldType::Varint,
916 }],
917 )
918 .unwrap();
919 for i in 0..5u64 {
920 enc.write_event(
921 &schema,
922 &[FieldValue::Varint(i * 1000), FieldValue::Varint(i)],
923 )
924 .unwrap();
925 }
926 let data = enc.finish();
927
928 let mut dec = Decoder::new(&data).unwrap();
929 let first = dec.events().next().unwrap().unwrap();
931 assert!(matches!(first, DecodedFrameRef::Event { .. }));
932 }
933}