1use crate::codec::{
4 self, Frame, FrameRef, HEADER_SIZE, PoolEntry, PoolEntryRef, SchemaInfo, WireTypeId,
5};
6use crate::schema::{SchemaEntry, SchemaRegistry};
7use crate::types::{FieldType, FieldValueRef, InternedString};
8use std::collections::HashMap;
9use std::fmt;
10
11#[derive(Debug, Clone)]
15pub struct DecodeError {
16 pub pos: usize,
17 pub message: String,
18}
19
20impl fmt::Display for DecodeError {
21 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
22 write!(f, "decode error at byte {}: {}", self.pos, self.message)
23 }
24}
25
26impl std::error::Error for DecodeError {}
27
28#[non_exhaustive]
33pub struct RawEvent<'a, 'f> {
34 pub type_id: WireTypeId,
35 pub name: &'f str,
36 pub timestamp_ns: Option<u64>,
37 pub fields: &'f [FieldValueRef<'a>],
38 pub string_pool: &'f StringPool,
39}
40
41#[derive(Debug, Clone, Default)]
47pub struct StringPool(pub(crate) HashMap<InternedString, String>);
48
49impl StringPool {
50 pub(crate) fn new() -> Self {
51 Self(HashMap::new())
52 }
53
54 pub(crate) fn insert(&mut self, id: InternedString, value: String) {
55 self.0.insert(id, value);
56 }
57
58 pub fn get(&self, id: InternedString) -> Option<&str> {
59 self.0.get(&id).map(|s| s.as_str())
60 }
61
62 pub fn len(&self) -> usize {
63 self.0.len()
64 }
65
66 pub fn is_empty(&self) -> bool {
67 self.0.is_empty()
68 }
69
70 pub fn iter(&self) -> impl Iterator<Item = (InternedString, &str)> {
72 self.0.iter().map(|(&id, v)| (id, v.as_str()))
73 }
74}
75
76#[derive(Debug, Clone, PartialEq)]
78pub enum DecodedFrame {
79 Schema(SchemaEntry),
80 Event {
81 type_id: WireTypeId,
82 timestamp_ns: Option<u64>,
84 values: Vec<crate::types::FieldValue>,
85 },
86 StringPool(Vec<PoolEntry>),
87}
88
89#[derive(Debug, Clone, PartialEq)]
91pub enum DecodedFrameRef<'a> {
92 Schema(SchemaEntry),
93 Event {
94 type_id: WireTypeId,
95 timestamp_ns: Option<u64>,
96 values: Vec<FieldValueRef<'a>>,
97 },
98 StringPool(Vec<PoolEntryRef<'a>>),
99}
100
101struct SchemaCache {
102 name: String,
103 field_types: Vec<FieldType>,
104 has_timestamp: bool,
105}
106
107pub struct Decoder<'a> {
108 data: &'a [u8],
109 pos: usize,
110 registry: SchemaRegistry,
111 schema_cache: HashMap<WireTypeId, SchemaCache>,
112 string_pool: StringPool,
113 version: u8,
114 timestamp_base_ns: u64,
115}
116
117impl<'a> Decoder<'a> {
118 pub fn new(data: &'a [u8]) -> Option<Self> {
119 let version = codec::decode_header(data)?;
120 Some(Self {
121 data,
122 pos: HEADER_SIZE,
123 registry: SchemaRegistry::new(),
124 schema_cache: HashMap::new(),
125 string_pool: StringPool::new(),
126 version,
127 timestamp_base_ns: 0,
128 })
129 }
130
131 pub fn registry(&self) -> &SchemaRegistry {
132 &self.registry
133 }
134
135 pub fn version(&self) -> u8 {
136 self.version
137 }
138
139 pub fn string_pool(&self) -> &StringPool {
140 &self.string_pool
141 }
142
143 pub fn into_encoder<W: std::io::Write>(self, writer: W) -> crate::encoder::Encoder<W> {
150 crate::encoder::Encoder::from_decoder(
151 self.registry,
152 self.string_pool,
153 self.timestamp_base_ns,
154 writer,
155 )
156 }
157
158 fn schema_info(&self, type_id: WireTypeId) -> Option<SchemaInfo<'_>> {
159 self.schema_cache.get(&type_id).map(|c| SchemaInfo {
160 field_types: &c.field_types,
161 has_timestamp: c.has_timestamp,
162 })
163 }
164
165 fn register_schema(&mut self, type_id: WireTypeId, entry: SchemaEntry) -> Result<(), String> {
166 self.schema_cache.insert(
167 type_id,
168 SchemaCache {
169 name: entry.name.clone(),
170 field_types: entry.fields.iter().map(|f| f.field_type).collect(),
171 has_timestamp: entry.has_timestamp,
172 },
173 );
174 self.registry.register(type_id, entry)
175 }
176
177 pub fn next_frame(&mut self) -> Result<Option<DecodedFrame>, DecodeError> {
181 if self.pos >= self.data.len() {
182 return Ok(None);
183 }
184 let remaining = &self.data[self.pos..];
185 let base = self.timestamp_base_ns;
186 let (frame, consumed) =
187 match codec::decode_frame(remaining, |type_id| self.schema_info(type_id), base) {
188 Some(r) => r,
189 None => return Ok(None),
190 };
191 self.pos += consumed;
192 match frame {
193 Frame::Schema { type_id, entry } => {
194 let result = DecodedFrame::Schema(entry.clone());
195 self.register_schema(type_id, entry)
196 .map_err(|msg| DecodeError {
197 pos: self.pos,
198 message: msg,
199 })?;
200 Ok(Some(result))
201 }
202 Frame::Event {
203 type_id,
204 timestamp_ns,
205 values,
206 } => {
207 if let Some(ts) = timestamp_ns {
208 self.timestamp_base_ns = ts;
209 }
210 Ok(Some(DecodedFrame::Event {
211 type_id,
212 timestamp_ns,
213 values,
214 }))
215 }
216 Frame::StringPool(entries) => {
217 for e in &entries {
218 if let Ok(s) = String::from_utf8(e.data.clone()) {
219 self.string_pool.insert(InternedString(e.pool_id), s);
220 }
221 }
222 Ok(Some(DecodedFrame::StringPool(entries)))
223 }
224 Frame::TimestampReset(ts) => {
225 self.timestamp_base_ns = ts;
226 self.next_frame() }
228 }
229 }
230
231 pub fn decode_all(&mut self) -> Vec<DecodedFrame> {
233 let mut frames = Vec::new();
234 while let Ok(Some(f)) = self.next_frame() {
235 frames.push(f);
236 }
237 frames
238 }
239
240 pub fn next_frame_ref(&mut self) -> Result<Option<DecodedFrameRef<'a>>, DecodeError> {
243 if self.pos >= self.data.len() {
244 return Ok(None);
245 }
246 let remaining = &self.data[self.pos..];
247 let base = self.timestamp_base_ns;
248 let (frame, consumed) =
249 match codec::decode_frame_ref(remaining, |type_id| self.schema_info(type_id), base) {
250 Some(r) => r,
251 None => return Ok(None),
252 };
253 self.pos += consumed;
254 match frame {
255 FrameRef::Schema { type_id, entry } => {
256 let result = DecodedFrameRef::Schema(entry.clone());
257 self.register_schema(type_id, entry)
258 .map_err(|msg| DecodeError {
259 pos: self.pos,
260 message: msg,
261 })?;
262 Ok(Some(result))
263 }
264 FrameRef::Event {
265 type_id,
266 timestamp_ns,
267 values,
268 } => {
269 if let Some(ts) = timestamp_ns {
270 self.timestamp_base_ns = ts;
271 }
272 Ok(Some(DecodedFrameRef::Event {
273 type_id,
274 timestamp_ns,
275 values,
276 }))
277 }
278 FrameRef::StringPool(entries) => {
279 for e in &entries {
280 if let Ok(s) = std::str::from_utf8(e.data) {
281 self.string_pool
282 .insert(InternedString(e.pool_id), s.to_string());
283 }
284 }
285 Ok(Some(DecodedFrameRef::StringPool(entries)))
286 }
287 FrameRef::TimestampReset(ts) => {
288 self.timestamp_base_ns = ts;
289 self.next_frame_ref()
290 }
291 }
292 }
293
294 pub fn decode_all_ref(&mut self) -> Vec<DecodedFrameRef<'a>> {
296 let mut frames = Vec::new();
297 while let Ok(Some(f)) = self.next_frame_ref() {
298 frames.push(f);
299 }
300 frames
301 }
302
303 pub fn for_each_event(
312 &mut self,
313 mut f: impl for<'f> FnMut(RawEvent<'a, 'f>),
314 ) -> Result<(), DecodeError> {
315 let mut values_buf: Vec<FieldValueRef<'a>> = Vec::new();
316 while self.pos < self.data.len() {
317 let remaining = &self.data[self.pos..];
318 let tag = match remaining.first() {
319 Some(t) => *t,
320 None => break,
321 };
322 match tag {
323 codec::TAG_EVENT => {
324 let mut pos = 1;
325 let type_id = match remaining.get(pos..pos + 2) {
326 Some(b) => {
327 pos += 2;
328 WireTypeId(u16::from_le_bytes(b.try_into().unwrap()))
329 }
330 None => {
331 return Err(DecodeError {
332 pos: self.pos,
333 message: "truncated event frame".into(),
334 });
335 }
336 };
337 let cache = match self.schema_cache.get(&type_id) {
338 Some(c) => c,
339 None => {
340 return Err(DecodeError {
341 pos: self.pos,
342 message: format!("unknown type_id {type_id:?}"),
343 });
344 }
345 };
346
347 let timestamp_ns = if cache.has_timestamp {
348 match codec::decode_u24_le(&remaining[pos..]) {
349 Some(delta) => {
350 pos += 3;
351 Some(self.timestamp_base_ns + delta as u64)
352 }
353 None => {
354 return Err(DecodeError {
355 pos: self.pos + pos,
356 message: "truncated timestamp delta".into(),
357 });
358 }
359 }
360 } else {
361 None
362 };
363
364 values_buf.clear();
365 for ft in &cache.field_types {
366 match FieldValueRef::decode(*ft, remaining, pos) {
367 Some((val, consumed)) => {
368 values_buf.push(val);
369 pos += consumed;
370 }
371 None => {
372 return Err(DecodeError {
373 pos: self.pos + pos,
374 message: "truncated field value".into(),
375 });
376 }
377 }
378 }
379 self.pos += pos;
380 if let Some(ts) = timestamp_ns {
381 self.timestamp_base_ns = ts;
382 }
383 f(RawEvent {
384 type_id,
385 name: &cache.name,
386 timestamp_ns,
387 fields: &values_buf,
388 string_pool: &self.string_pool,
389 });
390 }
391 codec::TAG_TIMESTAMP_RESET => {
392 let ts = match self.data.get(self.pos + 1..self.pos + 9) {
395 Some(b) => u64::from_le_bytes(b.try_into().unwrap()),
396 None => {
397 return Err(DecodeError {
398 pos: self.pos,
399 message: "truncated timestamp reset".into(),
400 });
401 }
402 };
403 self.timestamp_base_ns = ts;
404 self.pos += 9;
405 }
406 _ => {
407 match self.next_frame_ref() {
410 Ok(Some(_)) => {}
411 Ok(None) => {
412 return Err(DecodeError {
413 pos: self.pos,
414 message: format!("failed to decode frame with tag 0x{tag:02x}"),
415 });
416 }
417 Err(e) => return Err(e),
418 }
419 }
420 }
421 }
422 Ok(())
423 }
424
425 pub fn events(&mut self) -> EventIter<'_, 'a> {
429 EventIter { decoder: self }
430 }
431}
432
433impl<'a> Iterator for Decoder<'a> {
434 type Item = Result<DecodedFrameRef<'a>, DecodeError>;
435
436 fn next(&mut self) -> Option<Self::Item> {
437 self.next_frame_ref().transpose()
438 }
439}
440
441pub struct EventIter<'d, 'a> {
444 decoder: &'d mut Decoder<'a>,
445}
446
447impl<'d, 'a> Iterator for EventIter<'d, 'a> {
448 type Item = Result<DecodedFrameRef<'a>, DecodeError>;
449
450 fn next(&mut self) -> Option<Self::Item> {
451 loop {
452 match self.decoder.next()? {
453 Ok(frame @ DecodedFrameRef::Event { .. }) => return Some(Ok(frame)),
454 Ok(_) => continue, Err(e) => return Some(Err(e)),
456 }
457 }
458 }
459}
460
461#[cfg(test)]
462mod tests {
463 use super::*;
464 use crate::encoder::Encoder;
465 use crate::schema::FieldDef;
466 use crate::types::{FieldType, FieldValue};
467
468 #[test]
469 fn decode_empty_stream() {
470 let enc = Encoder::new();
471 let data = enc.finish();
472 let mut dec = Decoder::new(&data).unwrap();
473 assert_eq!(dec.version(), 1);
474 assert!(dec.next_frame().unwrap().is_none());
475 }
476
477 #[test]
478 fn decode_schema_frame() {
479 let mut enc = Encoder::new();
480 enc.register_schema(
481 "Ev",
482 vec![FieldDef {
483 name: "v".into(),
484 field_type: FieldType::Varint,
485 }],
486 )
487 .unwrap();
488 let data = enc.finish();
489 let mut dec = Decoder::new(&data).unwrap();
490 let frame = dec.next_frame().unwrap().unwrap();
491 assert!(matches!(frame, DecodedFrame::Schema(s) if s.name == "Ev"));
492 }
493
494 #[test]
495 fn decode_event_after_schema() {
496 let mut enc = Encoder::new();
497 let schema = enc
498 .register_schema(
499 "Ev",
500 vec![FieldDef {
501 name: "v".into(),
502 field_type: FieldType::Varint,
503 }],
504 )
505 .unwrap();
506 enc.write_event(
507 &schema,
508 &[FieldValue::Varint(1_000), FieldValue::Varint(42)],
509 )
510 .unwrap();
511 let data = enc.finish();
512
513 let mut dec = Decoder::new(&data).unwrap();
514 let frames = dec.decode_all();
515 assert_eq!(frames.len(), 2);
516 if let DecodedFrame::Event { values, .. } = &frames[1] {
517 assert_eq!(*values, vec![FieldValue::Varint(42)]);
518 } else {
519 panic!("expected event");
520 }
521 }
522
523 #[test]
524 fn decode_string_pool_builds_map() {
525 let mut enc = Encoder::new();
526 let id = enc.intern_string("hello").unwrap();
527 let data = enc.finish();
528
529 let mut dec = Decoder::new(&data).unwrap();
530 dec.decode_all();
531 assert_eq!(dec.string_pool().get(id), Some("hello"));
532 }
533
534 #[test]
535 fn decode_multiple_events() {
536 let mut enc = Encoder::new();
537 let schema = enc
538 .register_schema(
539 "Ev",
540 vec![FieldDef {
541 name: "v".into(),
542 field_type: FieldType::Varint,
543 }],
544 )
545 .unwrap();
546 for i in 0..10u64 {
547 enc.write_event(
548 &schema,
549 &[FieldValue::Varint(i * 1000), FieldValue::Varint(i)],
550 )
551 .unwrap();
552 }
553 let data = enc.finish();
554
555 let mut dec = Decoder::new(&data).unwrap();
556 let frames = dec.decode_all();
557 assert_eq!(frames.len(), 11);
558 }
559
560 #[test]
561 fn bad_header_returns_none() {
562 assert!(Decoder::new(&[0x00, 0x00, 0x00, 0x00, 1]).is_none());
563 }
564
565 #[test]
566 fn iterator_yields_all_frames() {
567 let mut enc = Encoder::new();
568 let schema = enc
569 .register_schema(
570 "Ev",
571 vec![FieldDef {
572 name: "v".into(),
573 field_type: FieldType::Varint,
574 }],
575 )
576 .unwrap();
577 for i in 0..3u64 {
578 enc.write_event(
579 &schema,
580 &[FieldValue::Varint(i * 1000), FieldValue::Varint(i)],
581 )
582 .unwrap();
583 }
584 let data = enc.finish();
585
586 let dec = Decoder::new(&data).unwrap();
587 let frames: Vec<_> = dec.collect::<Result<Vec<_>, _>>().unwrap();
588 assert_eq!(frames.len(), 4);
590 assert!(matches!(frames[0], DecodedFrameRef::Schema(_)));
591 assert!(matches!(frames[1], DecodedFrameRef::Event { .. }));
592 }
593
594 #[test]
595 fn iterator_early_termination() {
596 let mut enc = Encoder::new();
597 let schema = enc
598 .register_schema(
599 "Ev",
600 vec![FieldDef {
601 name: "v".into(),
602 field_type: FieldType::Varint,
603 }],
604 )
605 .unwrap();
606 for i in 0..10u64 {
607 enc.write_event(
608 &schema,
609 &[FieldValue::Varint(i * 1000), FieldValue::Varint(i)],
610 )
611 .unwrap();
612 }
613 let data = enc.finish();
614
615 let mut dec = Decoder::new(&data).unwrap();
616 let first_two: Vec<_> = dec.by_ref().take(2).collect::<Result<Vec<_>, _>>().unwrap();
618 assert_eq!(first_two.len(), 2);
619 let next = dec.next();
621 assert!(next.is_some());
622 }
623
624 #[test]
625 fn events_iterator_skips_schema() {
626 let mut enc = Encoder::new();
627 let schema = enc
628 .register_schema(
629 "Ev",
630 vec![FieldDef {
631 name: "v".into(),
632 field_type: FieldType::Varint,
633 }],
634 )
635 .unwrap();
636 enc.write_event(
637 &schema,
638 &[FieldValue::Varint(1_000), FieldValue::Varint(42)],
639 )
640 .unwrap();
641 enc.write_event(
642 &schema,
643 &[FieldValue::Varint(2_000), FieldValue::Varint(99)],
644 )
645 .unwrap();
646 let data = enc.finish();
647
648 let mut dec = Decoder::new(&data).unwrap();
649 let events: Vec<_> = dec.events().collect::<Result<Vec<_>, _>>().unwrap();
650 assert_eq!(events.len(), 2);
652 for ev in &events {
653 assert!(matches!(ev, DecodedFrameRef::Event { .. }));
654 }
655 }
656
657 #[test]
658 fn events_iterator_first_event_only() {
659 let mut enc = Encoder::new();
660 let schema = enc
661 .register_schema(
662 "Ev",
663 vec![FieldDef {
664 name: "v".into(),
665 field_type: FieldType::Varint,
666 }],
667 )
668 .unwrap();
669 for i in 0..5u64 {
670 enc.write_event(
671 &schema,
672 &[FieldValue::Varint(i * 1000), FieldValue::Varint(i)],
673 )
674 .unwrap();
675 }
676 let data = enc.finish();
677
678 let mut dec = Decoder::new(&data).unwrap();
679 let first = dec.events().next().unwrap().unwrap();
681 assert!(matches!(first, DecodedFrameRef::Event { .. }));
682 }
683}