1use mproto::BaseLen;
2
3pub struct DecodeEvents<'a> {
4 buf: &'a [u8],
5 offset: usize,
6}
7
8impl<'a> DecodeEvents<'a> {
9 pub fn new(buf: &'a [u8]) -> Self {
10 Self { buf, offset: 0 }
11 }
12}
13
14impl<'a> Iterator for DecodeEvents<'a> {
15 type Item = DecodeEvent<'a>;
16
17 fn next(&mut self) -> Option<Self::Item> {
18 if self.offset == self.buf.len() {
19 return None;
20 }
21
22 let buffer_offset = self.offset;
23 let event: probius_mproto::EventHeader =
24 mproto::decode_value(&self.buf[buffer_offset..]).ok()?;
25 self.offset += probius_mproto::EventHeader::BASE_LEN;
26 let body_start = self.offset;
27 self.offset += event.len as usize;
28 let buffer_body_len = event.len as usize;
29
30 match event.kind {
31 probius_mproto::EventKind::CreateSource => {
32 let payload = mproto::decode_value(&self.buf[body_start..]).ok()?;
33 Some(DecodeEvent {
34 buffer_offset,
35 buffer_body_len,
36 kind: event.kind,
37 id: event.id,
38 body: DecodeEventBody::CreateSource(payload),
39 })
40 }
41 probius_mproto::EventKind::DeleteSource => {
42 Some(DecodeEvent {
43 buffer_offset,
44 buffer_body_len,
45 kind: event.kind,
46 id: event.id,
47 body: DecodeEventBody::DeleteSource,
48 })
49 }
50 probius_mproto::EventKind::Trace => {
51 let header = mproto::decode_value(&self.buf[body_start..]).ok()?;
52 Some(DecodeEvent {
53 buffer_offset,
54 buffer_body_len,
55 kind: event.kind,
56 id: event.id,
57 body: DecodeEventBody::Trace { header },
58 })
59 }
60 probius_mproto::EventKind::TraceAggregate => {
61 let header = mproto::decode_value(&self.buf[body_start..]).ok()?;
62 Some(DecodeEvent {
63 buffer_offset,
64 buffer_body_len,
65 kind: event.kind,
66 id: event.id,
67 body: DecodeEventBody::TraceAggregate { header },
68 })
69 }
70 _ => todo!(),
71 }
72 }
73}
74
75#[derive(Clone, Debug)]
76pub struct DecodeEvent<'a> {
77 pub buffer_offset: usize,
79 pub buffer_body_len: usize,
80
81 pub kind: probius_mproto::EventKind,
82 pub id: probius_mproto::EventId,
83 pub body: DecodeEventBody<'a>,
84}
85
86#[derive(Clone, Debug)]
87pub enum DecodeEventBody<'a> {
88 CreateSource(probius_mproto::CreateSourceLazy<'a>),
89 DeleteSource,
90 Trace {
91 header: probius_mproto::TraceLazy<'a>,
92 },
93 TraceAggregate {
94 header: probius_mproto::TraceAggregateLazy<'a>,
95 },
96 TraceAggregateDelta(probius_mproto::TraceAggregateLazy<'a>),
97}
98