1#[repr(u8)]
13#[derive(Debug, Clone, Copy, PartialEq, Eq)]
14pub enum Kind {
15 Handshake = 0,
16 RawFrame = 1,
17 ZstdBatch = 2,
18 PlainBatch = 3,
19 EpochBoundary = 4,
20 StreamError = 5,
21 StreamClosed = 6,
22 Heartbeat = 7,
23 AddCredit = 10,
24 Cancel = 11,
25 Pong = 12,
26}
27
28impl Kind {
29 pub fn from_u8(v: u8) -> Option<Self> {
30 Some(match v {
31 0 => Kind::Handshake,
32 1 => Kind::RawFrame,
33 2 => Kind::ZstdBatch,
34 3 => Kind::PlainBatch,
35 4 => Kind::EpochBoundary,
36 5 => Kind::StreamError,
37 6 => Kind::StreamClosed,
38 7 => Kind::Heartbeat,
39 10 => Kind::AddCredit,
40 11 => Kind::Cancel,
41 12 => Kind::Pong,
42 _ => return None,
43 })
44 }
45
46 pub fn has_checksum(self) -> bool {
49 !matches!(self, Kind::Handshake)
50 }
51}
52
53#[derive(Debug, Clone, Copy, PartialEq, Eq)]
54#[repr(u8)]
55pub enum AuthKind {
56 None = 0,
57 Bearer = 1,
58 Mtls = 2,
59}
60
61impl AuthKind {
62 pub fn from_u8(v: u8) -> Option<Self> {
63 match v {
64 0 => Some(AuthKind::None),
65 1 => Some(AuthKind::Bearer),
66 2 => Some(AuthKind::Mtls),
67 _ => None,
68 }
69 }
70}
71
72#[derive(Debug, Clone, Copy, PartialEq, Eq)]
73#[repr(u8)]
74pub enum ChosenMode {
75 RawFrame = 1,
76 ZstdBatch = 2,
77 PlainBatch = 3,
78}
79
80impl ChosenMode {
81 pub fn from_u8(v: u8) -> Option<Self> {
82 match v {
83 1 => Some(ChosenMode::RawFrame),
84 2 => Some(ChosenMode::ZstdBatch),
85 3 => Some(ChosenMode::PlainBatch),
86 _ => None,
87 }
88 }
89}
90
91#[derive(Debug, Clone, Copy, PartialEq, Eq)]
92#[repr(u8)]
93pub enum HandshakeStatus {
94 Ok = 0,
95 Error = 1,
98}
99
100impl HandshakeStatus {
101 pub fn from_u8(v: u8) -> Self {
102 match v {
103 0 => HandshakeStatus::Ok,
104 _ => HandshakeStatus::Error,
105 }
106 }
107}
108
109#[derive(Debug, Clone, Copy, PartialEq, Eq)]
110#[repr(u8)]
111pub enum StreamErrorCode {
112 Unspecified = 0,
113 MalformedRecord = 1,
114 OversizedRecord = 2,
115 IoError = 3,
116 SlowConsumer = 4,
117 ServerShutdown = 5,
118 AuthDenied = 6,
119 ResourceExhausted = 7,
120 Internal = 8,
121}
122
123impl StreamErrorCode {
124 pub fn from_u8(v: u8) -> Self {
125 match v {
126 1 => StreamErrorCode::MalformedRecord,
127 2 => StreamErrorCode::OversizedRecord,
128 3 => StreamErrorCode::IoError,
129 4 => StreamErrorCode::SlowConsumer,
130 5 => StreamErrorCode::ServerShutdown,
131 6 => StreamErrorCode::AuthDenied,
132 7 => StreamErrorCode::ResourceExhausted,
133 8 => StreamErrorCode::Internal,
134 _ => StreamErrorCode::Unspecified,
135 }
136 }
137}
138
139#[derive(Debug, Clone, PartialEq, Eq)]
141pub struct BatchPayload {
142 pub batch_id: u64,
143 pub epoch: u32,
144 pub records: Vec<Vec<u8>>,
145}
146
147#[derive(Debug, Clone, PartialEq, Eq)]
148pub struct ClientHello {
149 pub capability_flags: u8,
152 pub auth_kind: AuthKind,
153 pub auth: Vec<u8>,
155 pub open_stream: Vec<u8>,
158}
159
160#[derive(Debug, Clone, PartialEq, Eq)]
161pub struct ServerHello {
162 pub status: HandshakeStatus,
163 pub chosen_mode: Option<ChosenMode>,
164 pub initial_credit: u64,
165 pub server_version: u8,
166 pub max_message_bytes: u32,
167 pub stream_opened: Vec<u8>,
170}
171
172#[derive(Debug, Clone, PartialEq, Eq)]
174pub enum Message {
175 ClientHello(ClientHello),
176 ServerHello(ServerHello),
177 RawFrame {
178 frame_id: u32,
179 perm_seed: [u8; 32],
184 zstd_bytes: Vec<u8>,
185 },
186 ZstdBatch {
187 batch_id: u64,
188 epoch: u32,
189 n_records: u32,
190 zstd_bytes: Vec<u8>,
191 },
192 PlainBatch(BatchPayload),
193 EpochBoundary {
194 completed_epoch: u32,
195 records_in_epoch: u64,
196 },
197 StreamError {
198 code: StreamErrorCode,
199 fatal: bool,
200 detail: Vec<u8>,
201 },
202 StreamClosed {
203 total_records: u64,
204 epochs_completed: u32,
205 },
206 Heartbeat {
207 now_unix_nanos: u64,
208 },
209 AddCredit {
210 add_bytes: u64,
211 },
212 Cancel {
213 reason: Vec<u8>,
214 },
215 Pong {
216 now_unix_nanos: u64,
217 },
218}
219
220impl Message {
221 pub fn kind(&self) -> Kind {
222 match self {
223 Message::ClientHello(_) | Message::ServerHello(_) => Kind::Handshake,
224 Message::RawFrame { .. } => Kind::RawFrame,
225 Message::ZstdBatch { .. } => Kind::ZstdBatch,
226 Message::PlainBatch(_) => Kind::PlainBatch,
227 Message::EpochBoundary { .. } => Kind::EpochBoundary,
228 Message::StreamError { .. } => Kind::StreamError,
229 Message::StreamClosed { .. } => Kind::StreamClosed,
230 Message::Heartbeat { .. } => Kind::Heartbeat,
231 Message::AddCredit { .. } => Kind::AddCredit,
232 Message::Cancel { .. } => Kind::Cancel,
233 Message::Pong { .. } => Kind::Pong,
234 }
235 }
236}