1use crate::codec::{
10 self, Frame, FrameRef, HEADER_SIZE, PoolEntry, PoolEntryRef, SchemaInfo, StackPoolEntry,
11 StackPoolEntryRef, WireTypeId,
12};
13use crate::schema::{SchemaEntry, SchemaRegistry};
14use crate::types::{FieldType, FieldValueRef, InternedStackFrames, InternedString, StackFrames};
15use std::collections::HashMap;
16use std::fmt;
17
18#[derive(Debug, Clone)]
22pub struct DecodeError {
23 pub pos: usize,
24 pub message: String,
25}
26
27impl fmt::Display for DecodeError {
28 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
29 write!(f, "decode error at byte {}: {}", self.pos, self.message)
30 }
31}
32
33impl std::error::Error for DecodeError {}
34
35#[derive(Debug)]
37pub enum TryForEachError<E> {
38 Decode(DecodeError),
39 User(E),
40}
41
42impl<E: fmt::Display> fmt::Display for TryForEachError<E> {
43 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
44 match self {
45 TryForEachError::Decode(e) => write!(f, "{e}"),
46 TryForEachError::User(e) => write!(f, "{e}"),
47 }
48 }
49}
50
51impl<E: fmt::Display + fmt::Debug> std::error::Error for TryForEachError<E> {}
52
53#[non_exhaustive]
58pub struct RawEvent<'a, 'f> {
59 pub type_id: WireTypeId,
60 pub name: &'f str,
61 pub timestamp_ns: Option<u64>,
62 pub fields: &'f [FieldValueRef<'a>],
63 pub schema: &'f SchemaEntry,
64 pub string_pool: &'f StringPool,
65 pub stack_pool: &'f StackPool,
66}
67
68impl<'a, 'f> RawEvent<'a, 'f> {
69 pub fn field_names(&self) -> impl Iterator<Item = &'f str> {
71 self.schema.fields.iter().map(|f| f.name.as_str())
72 }
73
74 #[cfg(feature = "serde-deserialize")]
90 pub fn deserialize<E: serde::de::DeserializeOwned>(&self) -> Result<E, crate::de::DeserError> {
91 crate::de::from_raw_event(self)
92 }
93}
94
95#[derive(Debug, Default)]
101pub struct StringPool(pub(crate) HashMap<InternedString, String>);
102
103impl StringPool {
104 pub(crate) fn new() -> Self {
105 Self(HashMap::default())
106 }
107
108 pub(crate) fn insert(&mut self, id: InternedString, value: String) {
109 self.0.insert(id, value);
110 }
111
112 pub fn get(&self, id: InternedString) -> Option<&str> {
113 self.0.get(&id).map(|s| s.as_str())
114 }
115
116 pub fn len(&self) -> usize {
117 self.0.len()
118 }
119
120 pub fn is_empty(&self) -> bool {
121 self.0.is_empty()
122 }
123
124 pub fn iter(&self) -> impl Iterator<Item = (InternedString, &str)> {
126 self.0.iter().map(|(&id, v)| (id, v.as_str()))
127 }
128}
129
130#[derive(Debug, Default)]
134pub struct StackPool(pub(crate) HashMap<InternedStackFrames, Vec<u64>>);
135
136impl StackPool {
137 pub(crate) fn new() -> Self {
138 Self(HashMap::default())
139 }
140
141 pub(crate) fn insert(&mut self, id: InternedStackFrames, frames: StackFrames) {
142 self.0.insert(id, frames.0);
143 }
144
145 pub fn get(&self, id: InternedStackFrames) -> Option<&[u64]> {
146 self.0.get(&id).map(|v| v.as_slice())
147 }
148
149 pub fn len(&self) -> usize {
150 self.0.len()
151 }
152
153 pub fn is_empty(&self) -> bool {
154 self.0.is_empty()
155 }
156
157 pub fn iter(&self) -> impl Iterator<Item = (InternedStackFrames, &[u64])> {
159 self.0.iter().map(|(&id, v)| (id, v.as_slice()))
160 }
161}
162
163#[derive(Debug, Clone, PartialEq)]
165pub enum DecodedFrame {
166 Schema(SchemaEntry),
167 Event {
168 type_id: WireTypeId,
169 timestamp_ns: Option<u64>,
171 values: Vec<crate::types::FieldValue>,
172 },
173 StringPool(Vec<PoolEntry>),
174 StackPool(Vec<StackPoolEntry>),
175 SchemaAnnotations {
176 type_id: WireTypeId,
177 annotations: Vec<crate::schema::FieldAnnotation>,
178 },
179}
180
181#[derive(Debug, Clone, PartialEq)]
183pub enum DecodedFrameRef<'a> {
184 Schema(SchemaEntry),
185 Event {
186 type_id: WireTypeId,
187 timestamp_ns: Option<u64>,
188 values: Vec<FieldValueRef<'a>>,
189 },
190 StringPool(Vec<PoolEntryRef<'a>>),
191 StackPool(Vec<StackPoolEntryRef<'a>>),
192 SchemaAnnotations {
193 type_id: WireTypeId,
194 annotations: Vec<crate::schema::FieldAnnotation>,
195 },
196}
197
198struct SchemaCache {
199 entry: SchemaEntry,
200 field_tags: Vec<u8>,
202}
203
204pub struct Decoder<'a> {
209 data: &'a [u8],
210 pos: usize,
211 registry: SchemaRegistry,
212 schema_cache: Vec<Option<SchemaCache>>,
213 string_pool: StringPool,
214 stack_pool: StackPool,
215 version: u8,
216 timestamp_base_ns: u64,
217}
218
219impl<'a> Decoder<'a> {
220 pub fn new(data: &'a [u8]) -> Option<Self> {
221 let version = codec::decode_header(data)?;
222 Some(Self {
223 data,
224 pos: HEADER_SIZE,
225 registry: SchemaRegistry::new(),
226 schema_cache: Vec::new(),
227 string_pool: StringPool::new(),
228 stack_pool: StackPool::new(),
229 version,
230 timestamp_base_ns: 0,
231 })
232 }
233
234 pub fn registry(&self) -> &SchemaRegistry {
235 &self.registry
236 }
237
238 pub fn version(&self) -> u8 {
239 self.version
240 }
241
242 pub fn string_pool(&self) -> &StringPool {
243 &self.string_pool
244 }
245
246 pub fn stack_pool(&self) -> &StackPool {
247 &self.stack_pool
248 }
249
250 fn reset_state(&mut self) {
254 self.registry = SchemaRegistry::new();
255 self.schema_cache.clear();
256 self.string_pool = StringPool::new();
257 self.stack_pool = StackPool::new();
258 self.timestamp_base_ns = 0;
259 }
260
261 fn try_consume_reset_header(&mut self) -> bool {
264 if self.pos + HEADER_SIZE <= self.data.len()
265 && codec::decode_header(&self.data[self.pos..]).is_some()
266 {
267 self.reset_state();
268 self.pos += HEADER_SIZE;
269 true
270 } else {
271 false
272 }
273 }
274
275 pub fn into_encoder<W: std::io::Write>(self, writer: W) -> crate::encoder::Encoder<W> {
282 crate::encoder::Encoder::from_decoder(
283 self.registry,
284 self.string_pool,
285 self.stack_pool,
286 self.timestamp_base_ns,
287 writer,
288 )
289 }
290
291 pub(crate) fn schema_info(&self, type_id: WireTypeId) -> Option<SchemaInfo<'_>> {
292 self.schema_cache
293 .get(type_id.0 as usize)
294 .and_then(|s| s.as_ref())
295 .map(|c| SchemaInfo {
296 field_tags: &c.field_tags,
297 has_timestamp: c.entry.has_timestamp,
298 })
299 }
300
301 fn register_schema(&mut self, type_id: WireTypeId, entry: SchemaEntry) -> Result<(), String> {
302 let idx = type_id.0 as usize;
303 if idx >= self.schema_cache.len() {
304 self.schema_cache.resize_with(idx + 1, || None);
305 }
306 self.schema_cache[idx] = Some(SchemaCache {
307 field_tags: entry.fields.iter().map(|f| f.field_type as u8).collect(),
308 entry: entry.clone(),
309 });
310 self.registry.register(type_id, entry)
311 }
312
313 pub fn next_frame(&mut self) -> Result<Option<DecodedFrame>, DecodeError> {
317 if self.pos >= self.data.len() {
318 return Ok(None);
319 }
320 if self.try_consume_reset_header() {
321 return self.next_frame();
322 }
323 let remaining = &self.data[self.pos..];
324 let base = self.timestamp_base_ns;
325 let (frame, consumed) =
326 match codec::decode_frame(remaining, |type_id| self.schema_info(type_id), base) {
327 Some(r) => r,
328 None => return Ok(None),
329 };
330 self.pos += consumed;
331 match frame {
332 Frame::Schema { type_id, entry } => {
333 let result = DecodedFrame::Schema(entry.clone());
334 self.register_schema(type_id, entry)
335 .map_err(|msg| DecodeError {
336 pos: self.pos,
337 message: msg,
338 })?;
339 Ok(Some(result))
340 }
341 Frame::Event {
342 type_id,
343 timestamp_ns,
344 values,
345 } => {
346 if let Some(ts) = timestamp_ns {
347 self.timestamp_base_ns = ts;
348 }
349 Ok(Some(DecodedFrame::Event {
350 type_id,
351 timestamp_ns,
352 values,
353 }))
354 }
355 Frame::StringPool(entries) => {
356 for e in &entries {
357 if let Ok(s) = String::from_utf8(e.data.clone()) {
358 self.string_pool.insert(InternedString(e.pool_id), s);
359 }
360 }
361 Ok(Some(DecodedFrame::StringPool(entries)))
362 }
363 Frame::StackPool(entries) => {
364 for e in &entries {
365 self.stack_pool
366 .insert(InternedStackFrames(e.pool_id), e.frames.clone().into());
367 }
368 Ok(Some(DecodedFrame::StackPool(entries)))
369 }
370 Frame::TimestampReset(ts) => {
371 self.timestamp_base_ns = ts;
372 self.next_frame() }
374 Frame::SchemaAnnotations {
375 type_id,
376 annotations,
377 } => {
378 if let Some(cache) = self
380 .schema_cache
381 .get_mut(type_id.0 as usize)
382 .and_then(|s| s.as_mut())
383 {
384 cache.entry.annotations.extend_from_slice(&annotations);
385 }
386 if let Some(entry) = self.registry.schemas.get_mut(&type_id) {
387 entry.annotations.extend_from_slice(&annotations);
388 }
389 Ok(Some(DecodedFrame::SchemaAnnotations {
390 type_id,
391 annotations,
392 }))
393 }
394 }
395 }
396
397 pub fn decode_all(&mut self) -> Vec<DecodedFrame> {
399 let mut frames = Vec::new();
400 while let Ok(Some(f)) = self.next_frame() {
401 frames.push(f);
402 }
403 frames
404 }
405
406 pub fn next_frame_ref(&mut self) -> Result<Option<DecodedFrameRef<'a>>, DecodeError> {
409 if self.pos >= self.data.len() {
410 return Ok(None);
411 }
412 if self.try_consume_reset_header() {
413 return self.next_frame_ref();
414 }
415 let remaining = &self.data[self.pos..];
416 let base = self.timestamp_base_ns;
417 let (frame, consumed) =
418 match codec::decode_frame_ref(remaining, |type_id| self.schema_info(type_id), base) {
419 Some(r) => r,
420 None => return Ok(None),
421 };
422 self.pos += consumed;
423 match frame {
424 FrameRef::Schema { type_id, entry } => {
425 let result = DecodedFrameRef::Schema(entry.clone());
426 self.register_schema(type_id, entry)
427 .map_err(|msg| DecodeError {
428 pos: self.pos,
429 message: msg,
430 })?;
431 Ok(Some(result))
432 }
433 FrameRef::Event {
434 type_id,
435 timestamp_ns,
436 values,
437 } => {
438 if let Some(ts) = timestamp_ns {
439 self.timestamp_base_ns = ts;
440 }
441 Ok(Some(DecodedFrameRef::Event {
442 type_id,
443 timestamp_ns,
444 values,
445 }))
446 }
447 FrameRef::StringPool(entries) => {
448 for e in &entries {
449 if let Ok(s) = std::str::from_utf8(e.data) {
450 self.string_pool
451 .insert(InternedString(e.pool_id), s.to_string());
452 }
453 }
454 Ok(Some(DecodedFrameRef::StringPool(entries)))
455 }
456 FrameRef::StackPool(entries) => {
457 for e in &entries {
458 self.stack_pool
459 .insert(InternedStackFrames(e.pool_id), e.to_stack_frames());
460 }
461 Ok(Some(DecodedFrameRef::StackPool(entries)))
462 }
463 FrameRef::TimestampReset(ts) => {
464 self.timestamp_base_ns = ts;
465 self.next_frame_ref()
466 }
467 FrameRef::SchemaAnnotations {
468 type_id,
469 annotations,
470 } => {
471 if let Some(cache) = self
472 .schema_cache
473 .get_mut(type_id.0 as usize)
474 .and_then(|s| s.as_mut())
475 {
476 cache.entry.annotations.extend_from_slice(&annotations);
477 }
478 if let Some(entry) = self.registry.schemas.get_mut(&type_id) {
479 entry.annotations.extend_from_slice(&annotations);
480 }
481 Ok(Some(DecodedFrameRef::SchemaAnnotations {
482 type_id,
483 annotations,
484 }))
485 }
486 }
487 }
488
489 pub fn decode_all_ref(&mut self) -> Vec<DecodedFrameRef<'a>> {
491 let mut frames = Vec::new();
492 while let Ok(Some(f)) = self.next_frame_ref() {
493 frames.push(f);
494 }
495 frames
496 }
497
498 pub fn for_each_event(
507 &mut self,
508 mut f: impl for<'f> FnMut(RawEvent<'a, 'f>),
509 ) -> Result<(), DecodeError> {
510 self.try_for_each_event(|ev| {
511 f(ev);
512 Ok::<(), std::convert::Infallible>(())
513 })
514 .map_err(|e| match e {
515 TryForEachError::Decode(d) => d,
516 TryForEachError::User(inf) => match inf {},
517 })
518 }
519
520 pub fn try_for_each_event<E>(
523 &mut self,
524 mut f: impl for<'f> FnMut(RawEvent<'a, 'f>) -> Result<(), E>,
525 ) -> Result<(), TryForEachError<E>> {
526 let mut values_buf: Vec<FieldValueRef<'a>> = Vec::new();
527 while self.pos < self.data.len() {
528 let remaining = &self.data[self.pos..];
529 let tag = match remaining.first() {
530 Some(t) => *t,
531 None => break,
532 };
533 match tag {
534 codec::TAG_EVENT => {
535 let mut pos = 1;
536 let type_id = match remaining.get(pos..pos + 2) {
537 Some(b) => {
538 pos += 2;
539 WireTypeId(u16::from_le_bytes(b.try_into().unwrap()))
540 }
541 None => {
542 return Err(TryForEachError::Decode(DecodeError {
543 pos: self.pos,
544 message: "truncated event frame".into(),
545 }));
546 }
547 };
548 let cache = match self
549 .schema_cache
550 .get(type_id.0 as usize)
551 .and_then(|s| s.as_ref())
552 {
553 Some(c) => c,
554 None => {
555 return Err(TryForEachError::Decode(DecodeError {
556 pos: self.pos,
557 message: format!("unknown type_id {type_id:?}"),
558 }));
559 }
560 };
561
562 let timestamp_ns = if cache.entry.has_timestamp {
563 match codec::decode_u24_le(&remaining[pos..]) {
564 Some(delta) => {
565 pos += 3;
566 Some(self.timestamp_base_ns + delta as u64)
567 }
568 None => {
569 return Err(TryForEachError::Decode(DecodeError {
570 pos: self.pos + pos,
571 message: "truncated timestamp delta".into(),
572 }));
573 }
574 }
575 } else {
576 None
577 };
578
579 values_buf.clear();
580 for &ftag in &cache.field_tags {
581 let inner_type = match FieldType::from_tag(ftag) {
582 Some(ft) => ft,
583 None => {
584 return Err(TryForEachError::Decode(DecodeError {
585 pos: self.pos + pos,
586 message: format!("unknown field type tag {ftag:#x}"),
587 }));
588 }
589 };
590 if inner_type.is_optional() {
591 match remaining.get(pos) {
592 Some(0x00) => {
593 values_buf.push(FieldValueRef::None);
594 pos += 1;
595 }
596 Some(_) => {
597 pos += 1;
598 match FieldValueRef::decode(inner_type.inner(), remaining, pos)
599 {
600 Some((val, consumed)) => {
601 values_buf.push(val);
602 pos += consumed;
603 }
604 None => {
605 return Err(TryForEachError::Decode(DecodeError {
606 pos: self.pos + pos,
607 message: "truncated optional field value".into(),
608 }));
609 }
610 }
611 }
612 None => {
613 return Err(TryForEachError::Decode(DecodeError {
614 pos: self.pos + pos,
615 message: "truncated optional field prefix".into(),
616 }));
617 }
618 }
619 } else {
620 match FieldValueRef::decode(inner_type, remaining, pos) {
621 Some((val, consumed)) => {
622 values_buf.push(val);
623 pos += consumed;
624 }
625 None => {
626 return Err(TryForEachError::Decode(DecodeError {
627 pos: self.pos + pos,
628 message: "truncated field value".into(),
629 }));
630 }
631 }
632 }
633 }
634 {
640 let Self {
641 pos: self_pos,
642 timestamp_base_ns,
643 ..
644 } = self;
645 *self_pos += pos;
646 if let Some(ts) = timestamp_ns {
647 *timestamp_base_ns = ts;
648 }
649 }
650 f(RawEvent {
651 type_id,
652 name: &cache.entry.name,
653 timestamp_ns,
654 fields: &values_buf,
655 schema: &cache.entry,
656 string_pool: &self.string_pool,
657 stack_pool: &self.stack_pool,
658 })
659 .map_err(TryForEachError::User)?;
660 }
661 codec::TAG_TIMESTAMP_RESET => {
662 let ts = match self.data.get(self.pos + 1..self.pos + 9) {
663 Some(b) => u64::from_le_bytes(b.try_into().unwrap()),
664 None => {
665 return Err(TryForEachError::Decode(DecodeError {
666 pos: self.pos,
667 message: "truncated timestamp reset".into(),
668 }));
669 }
670 };
671 self.timestamp_base_ns = ts;
672 self.pos += 9;
673 }
674 _ => {
675 if tag == codec::MAGIC[0] && self.try_consume_reset_header() {
677 continue;
678 }
679 match self.next_frame_ref() {
680 Ok(Some(_)) => {}
681 Ok(None) => {
682 return Err(TryForEachError::Decode(DecodeError {
683 pos: self.pos,
684 message: format!("failed to decode frame with tag 0x{tag:02x}"),
685 }));
686 }
687 Err(e) => return Err(TryForEachError::Decode(e)),
688 }
689 }
690 }
691 }
692 Ok(())
693 }
694
695 pub fn events(&mut self) -> EventIter<'_, 'a> {
699 EventIter { decoder: self }
700 }
701}
702
703impl<'a> Iterator for Decoder<'a> {
704 type Item = Result<DecodedFrameRef<'a>, DecodeError>;
705
706 fn next(&mut self) -> Option<Self::Item> {
707 self.next_frame_ref().transpose()
708 }
709}
710
711pub struct EventIter<'d, 'a> {
714 decoder: &'d mut Decoder<'a>,
715}
716
717impl<'d, 'a> Iterator for EventIter<'d, 'a> {
718 type Item = Result<DecodedFrameRef<'a>, DecodeError>;
719
720 fn next(&mut self) -> Option<Self::Item> {
721 loop {
722 match self.decoder.next()? {
723 Ok(frame @ DecodedFrameRef::Event { .. }) => return Some(Ok(frame)),
724 Ok(_) => continue, Err(e) => return Some(Err(e)),
726 }
727 }
728 }
729}
730
731#[cfg(test)]
732mod tests {
733 use super::*;
734 use crate::encoder::Encoder;
735 use crate::schema::FieldDef;
736 use crate::types::{FieldType, FieldValue};
737
738 #[test]
739 fn decode_empty_stream() {
740 let enc = Encoder::new();
741 let data = enc.finish();
742 let mut dec = Decoder::new(&data).unwrap();
743 assert_eq!(dec.version(), 1);
744 assert!(dec.next_frame().unwrap().is_none());
745 }
746
747 #[test]
748 fn decode_schema_frame() {
749 let mut enc = Encoder::new();
750 enc.register_schema(
751 "Ev",
752 vec![FieldDef {
753 name: "v".into(),
754 field_type: FieldType::Varint,
755 }],
756 )
757 .unwrap();
758 let data = enc.finish();
759 let mut dec = Decoder::new(&data).unwrap();
760 let frame = dec.next_frame().unwrap().unwrap();
761 assert!(matches!(frame, DecodedFrame::Schema(s) if s.name == "Ev"));
762 }
763
764 #[test]
765 fn decode_event_after_schema() {
766 let mut enc = Encoder::new();
767 let schema = enc
768 .register_schema(
769 "Ev",
770 vec![FieldDef {
771 name: "v".into(),
772 field_type: FieldType::Varint,
773 }],
774 )
775 .unwrap();
776 enc.write_event(
777 &schema,
778 &[FieldValue::Varint(1_000), FieldValue::Varint(42)],
779 )
780 .unwrap();
781 let data = enc.finish();
782
783 let mut dec = Decoder::new(&data).unwrap();
784 let frames = dec.decode_all();
785 assert_eq!(frames.len(), 2);
786 if let DecodedFrame::Event { values, .. } = &frames[1] {
787 assert_eq!(*values, vec![FieldValue::Varint(42)]);
788 } else {
789 panic!("expected event");
790 }
791 }
792
793 #[test]
794 fn decode_string_pool_builds_map() {
795 let mut enc = Encoder::new();
796 let id = enc.intern_string("hello").unwrap();
797 let data = enc.finish();
798
799 let mut dec = Decoder::new(&data).unwrap();
800 dec.decode_all();
801 assert_eq!(dec.string_pool().get(id), Some("hello"));
802 }
803
804 #[test]
805 fn decode_multiple_events() {
806 let mut enc = Encoder::new();
807 let schema = enc
808 .register_schema(
809 "Ev",
810 vec![FieldDef {
811 name: "v".into(),
812 field_type: FieldType::Varint,
813 }],
814 )
815 .unwrap();
816 for i in 0..10u64 {
817 enc.write_event(
818 &schema,
819 &[FieldValue::Varint(i * 1000), FieldValue::Varint(i)],
820 )
821 .unwrap();
822 }
823 let data = enc.finish();
824
825 let mut dec = Decoder::new(&data).unwrap();
826 let frames = dec.decode_all();
827 assert_eq!(frames.len(), 11);
828 }
829
830 #[test]
831 fn bad_header_returns_none() {
832 assert!(Decoder::new(&[0x00, 0x00, 0x00, 0x00, 1]).is_none());
833 }
834
835 #[test]
836 fn iterator_yields_all_frames() {
837 let mut enc = Encoder::new();
838 let schema = enc
839 .register_schema(
840 "Ev",
841 vec![FieldDef {
842 name: "v".into(),
843 field_type: FieldType::Varint,
844 }],
845 )
846 .unwrap();
847 for i in 0..3u64 {
848 enc.write_event(
849 &schema,
850 &[FieldValue::Varint(i * 1000), FieldValue::Varint(i)],
851 )
852 .unwrap();
853 }
854 let data = enc.finish();
855
856 let dec = Decoder::new(&data).unwrap();
857 let frames: Vec<_> = dec.collect::<Result<Vec<_>, _>>().unwrap();
858 assert_eq!(frames.len(), 4);
860 assert!(matches!(frames[0], DecodedFrameRef::Schema(_)));
861 assert!(matches!(frames[1], DecodedFrameRef::Event { .. }));
862 }
863
864 #[test]
865 fn iterator_early_termination() {
866 let mut enc = Encoder::new();
867 let schema = enc
868 .register_schema(
869 "Ev",
870 vec![FieldDef {
871 name: "v".into(),
872 field_type: FieldType::Varint,
873 }],
874 )
875 .unwrap();
876 for i in 0..10u64 {
877 enc.write_event(
878 &schema,
879 &[FieldValue::Varint(i * 1000), FieldValue::Varint(i)],
880 )
881 .unwrap();
882 }
883 let data = enc.finish();
884
885 let mut dec = Decoder::new(&data).unwrap();
886 let first_two: Vec<_> = dec.by_ref().take(2).collect::<Result<Vec<_>, _>>().unwrap();
888 assert_eq!(first_two.len(), 2);
889 let next = dec.next();
891 assert!(next.is_some());
892 }
893
894 #[test]
895 fn events_iterator_skips_schema() {
896 let mut enc = Encoder::new();
897 let schema = enc
898 .register_schema(
899 "Ev",
900 vec![FieldDef {
901 name: "v".into(),
902 field_type: FieldType::Varint,
903 }],
904 )
905 .unwrap();
906 enc.write_event(
907 &schema,
908 &[FieldValue::Varint(1_000), FieldValue::Varint(42)],
909 )
910 .unwrap();
911 enc.write_event(
912 &schema,
913 &[FieldValue::Varint(2_000), FieldValue::Varint(99)],
914 )
915 .unwrap();
916 let data = enc.finish();
917
918 let mut dec = Decoder::new(&data).unwrap();
919 let events: Vec<_> = dec.events().collect::<Result<Vec<_>, _>>().unwrap();
920 assert_eq!(events.len(), 2);
922 for ev in &events {
923 assert!(matches!(ev, DecodedFrameRef::Event { .. }));
924 }
925 }
926
927 #[test]
928 fn events_iterator_first_event_only() {
929 let mut enc = Encoder::new();
930 let schema = enc
931 .register_schema(
932 "Ev",
933 vec![FieldDef {
934 name: "v".into(),
935 field_type: FieldType::Varint,
936 }],
937 )
938 .unwrap();
939 for i in 0..5u64 {
940 enc.write_event(
941 &schema,
942 &[FieldValue::Varint(i * 1000), FieldValue::Varint(i)],
943 )
944 .unwrap();
945 }
946 let data = enc.finish();
947
948 let mut dec = Decoder::new(&data).unwrap();
949 let first = dec.events().next().unwrap().unwrap();
951 assert!(matches!(first, DecodedFrameRef::Event { .. }));
952 }
953}