probius/
decode.rs

1use mproto::BaseLen;
2
3pub struct DecodeEvents<'a> {
4    buf: &'a [u8],
5    offset: usize,
6}
7
8impl<'a> DecodeEvents<'a> {
9    pub fn new(buf: &'a [u8]) -> Self {
10        Self { buf, offset: 0 }
11    }
12}
13
14impl<'a> Iterator for DecodeEvents<'a> {
15    type Item = DecodeEvent<'a>;
16
17    fn next(&mut self) -> Option<Self::Item> {
18        if self.offset == self.buf.len() {
19            return None;
20        }
21
22        let buffer_offset = self.offset;
23        let event: probius_mproto::EventHeader =
24            mproto::decode_value(&self.buf[buffer_offset..]).ok()?;
25        self.offset += probius_mproto::EventHeader::BASE_LEN;
26        let body_start = self.offset;
27        self.offset += event.len as usize;
28        let buffer_body_len = event.len as usize;
29
30        match event.kind {
31            probius_mproto::EventKind::CreateSource => {
32                let payload = mproto::decode_value(&self.buf[body_start..]).ok()?;
33                Some(DecodeEvent {
34                    buffer_offset,
35                    buffer_body_len,
36                    kind: event.kind,
37                    id: event.id,
38                    body: DecodeEventBody::CreateSource(payload),
39                })
40            }
41            probius_mproto::EventKind::DeleteSource => {
42                Some(DecodeEvent {
43                    buffer_offset,
44                    buffer_body_len,
45                    kind: event.kind,
46                    id: event.id,
47                    body: DecodeEventBody::DeleteSource,
48                })
49            }
50            probius_mproto::EventKind::Trace => {
51                let header = mproto::decode_value(&self.buf[body_start..]).ok()?;
52                Some(DecodeEvent {
53                    buffer_offset,
54                    buffer_body_len,
55                    kind: event.kind,
56                    id: event.id,
57                    body: DecodeEventBody::Trace { header },
58                })
59            }
60            probius_mproto::EventKind::TraceAggregate => {
61                let header = mproto::decode_value(&self.buf[body_start..]).ok()?;
62                Some(DecodeEvent {
63                    buffer_offset,
64                    buffer_body_len,
65                    kind: event.kind,
66                    id: event.id,
67                    body: DecodeEventBody::TraceAggregate { header },
68                })
69            }
70            _ => todo!(),
71        }
72    }
73}
74
75#[derive(Clone, Debug)]
76pub struct DecodeEvent<'a> {
77    // Raw byte offset and length of this event in the `DecodeEvents` buffer
78    pub buffer_offset: usize,
79    pub buffer_body_len: usize,
80
81    pub kind: probius_mproto::EventKind,
82    pub id: probius_mproto::EventId,
83    pub body: DecodeEventBody<'a>,
84}
85
86#[derive(Clone, Debug)]
87pub enum DecodeEventBody<'a> {
88    CreateSource(probius_mproto::CreateSourceLazy<'a>),
89    DeleteSource,
90    Trace {
91        header: probius_mproto::TraceLazy<'a>,
92    },
93    TraceAggregate {
94        header: probius_mproto::TraceAggregateLazy<'a>,
95    },
96    TraceAggregateDelta(probius_mproto::TraceAggregateLazy<'a>),
97}
98