1use std::collections::HashMap;
27
28use bytes::{Buf, BytesMut};
29use thiserror::Error;
30
31const PRELUDE_LEN: usize = 12;
32const MESSAGE_CRC_LEN: usize = 4;
33const MIN_FRAME_LEN: usize = PRELUDE_LEN + MESSAGE_CRC_LEN;
34
35const TYPE_BOOL_TRUE: u8 = 0;
36const TYPE_BOOL_FALSE: u8 = 1;
37const TYPE_BYTE: u8 = 2;
38const TYPE_INT16: u8 = 3;
39const TYPE_INT32: u8 = 4;
40const TYPE_INT64: u8 = 5;
41const TYPE_BYTE_ARRAY: u8 = 6;
42const TYPE_STRING: u8 = 7;
43const TYPE_TIMESTAMP: u8 = 8;
44const TYPE_UUID: u8 = 9;
45
46#[derive(Clone, Debug, PartialEq, Eq)]
48#[non_exhaustive]
49pub enum EventStreamHeaderValue {
50 Bool(bool),
52 Byte(i8),
54 Int16(i16),
56 Int32(i32),
58 Int64(i64),
60 Bytes(Vec<u8>),
62 String(String),
64 Timestamp(i64),
66 Uuid([u8; 16]),
68}
69
70#[derive(Clone, Debug, PartialEq, Eq)]
72pub struct EventStreamHeader {
73 pub name: String,
75 pub value: EventStreamHeaderValue,
77}
78
79#[derive(Clone, Debug, PartialEq, Eq)]
81pub struct EventStreamFrame {
82 pub headers: Vec<EventStreamHeader>,
84 pub header_index: HashMap<String, EventStreamHeaderValue>,
87 pub payload: Vec<u8>,
89}
90
91impl EventStreamFrame {
92 pub fn event_type(&self) -> Option<&str> {
95 match self.header_index.get(":event-type") {
96 Some(EventStreamHeaderValue::String(s)) => Some(s),
97 _ => None,
98 }
99 }
100}
101
102#[derive(Debug, Error)]
104#[non_exhaustive]
105pub enum EventStreamParseError {
106 #[error("frame too short: total_length={0} (min {MIN_FRAME_LEN})")]
109 FrameTooShort(u32),
110
111 #[error("prelude CRC mismatch (computed {computed:#010x}, header {expected:#010x})")]
114 PreludeCrcMismatch {
115 computed: u32,
117 expected: u32,
119 },
120
121 #[error("message CRC mismatch (computed {computed:#010x}, trailer {expected:#010x})")]
123 MessageCrcMismatch {
124 computed: u32,
126 expected: u32,
128 },
129
130 #[error("unknown header value type: {0}")]
133 UnknownHeaderType(u8),
134
135 #[error("frame underrun reading {context}")]
138 Underrun {
139 context: &'static str,
142 },
143
144 #[error("non-UTF-8 string in header {0}")]
146 NonUtf8String(String),
147}
148
149#[derive(Default)]
151pub struct EventStreamDecoder {
152 buf: BytesMut,
153}
154
155impl EventStreamDecoder {
156 pub fn new() -> Self {
158 Self::default()
159 }
160
161 pub fn push(&mut self, chunk: &[u8]) {
163 self.buf.extend_from_slice(chunk);
164 }
165
166 pub fn next_frame(
172 &mut self,
173 ) -> std::result::Result<Option<EventStreamFrame>, EventStreamParseError> {
174 if self.buf.len() < PRELUDE_LEN {
175 return Ok(None);
176 }
177 let total_length = u32::from_be_bytes([self.buf[0], self.buf[1], self.buf[2], self.buf[3]]);
178 let total_usize = total_length as usize;
179 if total_usize < MIN_FRAME_LEN {
180 return Err(EventStreamParseError::FrameTooShort(total_length));
181 }
182 if self.buf.len() < total_usize {
183 return Ok(None);
184 }
185 let headers_length =
186 u32::from_be_bytes([self.buf[4], self.buf[5], self.buf[6], self.buf[7]]) as usize;
187 let prelude_crc_expected =
188 u32::from_be_bytes([self.buf[8], self.buf[9], self.buf[10], self.buf[11]]);
189 let prelude_crc_computed = crc32fast::hash(&self.buf[..8]);
190 if prelude_crc_computed != prelude_crc_expected {
191 return Err(EventStreamParseError::PreludeCrcMismatch {
192 computed: prelude_crc_computed,
193 expected: prelude_crc_expected,
194 });
195 }
196 let message_body_end = total_usize - MESSAGE_CRC_LEN;
198 let message_crc_expected = u32::from_be_bytes([
199 self.buf[message_body_end],
200 self.buf[message_body_end + 1],
201 self.buf[message_body_end + 2],
202 self.buf[message_body_end + 3],
203 ]);
204 let message_crc_computed = crc32fast::hash(&self.buf[..message_body_end]);
205 if message_crc_computed != message_crc_expected {
206 return Err(EventStreamParseError::MessageCrcMismatch {
207 computed: message_crc_computed,
208 expected: message_crc_expected,
209 });
210 }
211 let header_start = PRELUDE_LEN;
213 let header_end = header_start + headers_length;
214 let payload_start = header_end;
215 let payload_end = message_body_end;
216
217 let headers = parse_headers(&self.buf[header_start..header_end])?;
218 let payload = self.buf[payload_start..payload_end].to_vec();
219 let mut header_index = HashMap::with_capacity(headers.len());
220 for h in &headers {
221 header_index.insert(h.name.clone(), h.value.clone());
222 }
223 self.buf.advance(total_usize);
225 Ok(Some(EventStreamFrame {
226 headers,
227 header_index,
228 payload,
229 }))
230 }
231
232 pub fn has_residual(&self) -> bool {
235 !self.buf.is_empty()
236 }
237}
238
239fn parse_headers(
240 mut bytes: &[u8],
241) -> std::result::Result<Vec<EventStreamHeader>, EventStreamParseError> {
242 let mut out = Vec::new();
243 while !bytes.is_empty() {
244 let name_len = take_u8(&mut bytes, "header name length")? as usize;
245 let name = take_str(&mut bytes, name_len, "header name")?;
246 let type_byte = take_u8(&mut bytes, "header type")?;
247 let value = parse_header_value(&mut bytes, type_byte, &name)?;
248 out.push(EventStreamHeader { name, value });
249 }
250 Ok(out)
251}
252
253fn parse_header_value(
254 bytes: &mut &[u8],
255 type_byte: u8,
256 header_name: &str,
257) -> std::result::Result<EventStreamHeaderValue, EventStreamParseError> {
258 match type_byte {
259 TYPE_BOOL_TRUE => Ok(EventStreamHeaderValue::Bool(true)),
260 TYPE_BOOL_FALSE => Ok(EventStreamHeaderValue::Bool(false)),
261 TYPE_BYTE => {
262 let v = take_u8(bytes, "byte value")? as i8;
263 Ok(EventStreamHeaderValue::Byte(v))
264 }
265 TYPE_INT16 => {
266 let v = take_n::<2>(bytes, "int16 value")?;
267 Ok(EventStreamHeaderValue::Int16(i16::from_be_bytes(v)))
268 }
269 TYPE_INT32 => {
270 let v = take_n::<4>(bytes, "int32 value")?;
271 Ok(EventStreamHeaderValue::Int32(i32::from_be_bytes(v)))
272 }
273 TYPE_INT64 => {
274 let v = take_n::<8>(bytes, "int64 value")?;
275 Ok(EventStreamHeaderValue::Int64(i64::from_be_bytes(v)))
276 }
277 TYPE_BYTE_ARRAY => {
278 let len_bytes = take_n::<2>(bytes, "byte array length")?;
279 let len = u16::from_be_bytes(len_bytes) as usize;
280 let payload = take_slice(bytes, len, "byte array value")?;
281 Ok(EventStreamHeaderValue::Bytes(payload.to_vec()))
282 }
283 TYPE_STRING => {
284 let len_bytes = take_n::<2>(bytes, "string length")?;
285 let len = u16::from_be_bytes(len_bytes) as usize;
286 let payload = take_slice(bytes, len, "string value")?;
287 let s = std::str::from_utf8(payload)
288 .map_err(|_| EventStreamParseError::NonUtf8String(header_name.to_owned()))?;
289 Ok(EventStreamHeaderValue::String(s.to_owned()))
290 }
291 TYPE_TIMESTAMP => {
292 let v = take_n::<8>(bytes, "timestamp value")?;
293 Ok(EventStreamHeaderValue::Timestamp(i64::from_be_bytes(v)))
294 }
295 TYPE_UUID => {
296 let v = take_n::<16>(bytes, "uuid value")?;
297 Ok(EventStreamHeaderValue::Uuid(v))
298 }
299 other => Err(EventStreamParseError::UnknownHeaderType(other)),
300 }
301}
302
303fn take_u8(
304 bytes: &mut &[u8],
305 context: &'static str,
306) -> std::result::Result<u8, EventStreamParseError> {
307 if bytes.is_empty() {
308 return Err(EventStreamParseError::Underrun { context });
309 }
310 let v = bytes[0];
311 *bytes = &bytes[1..];
312 Ok(v)
313}
314
315fn take_n<const N: usize>(
316 bytes: &mut &[u8],
317 context: &'static str,
318) -> std::result::Result<[u8; N], EventStreamParseError> {
319 if bytes.len() < N {
320 return Err(EventStreamParseError::Underrun { context });
321 }
322 let mut out = [0u8; N];
323 out.copy_from_slice(&bytes[..N]);
324 *bytes = &bytes[N..];
325 Ok(out)
326}
327
328fn take_slice<'a>(
329 bytes: &mut &'a [u8],
330 n: usize,
331 context: &'static str,
332) -> std::result::Result<&'a [u8], EventStreamParseError> {
333 if bytes.len() < n {
334 return Err(EventStreamParseError::Underrun { context });
335 }
336 let (head, tail) = bytes.split_at(n);
337 *bytes = tail;
338 Ok(head)
339}
340
341fn take_str(
342 bytes: &mut &[u8],
343 n: usize,
344 context: &'static str,
345) -> std::result::Result<String, EventStreamParseError> {
346 let slice = take_slice(bytes, n, context)?;
347 let s = std::str::from_utf8(slice)
348 .map_err(|_| EventStreamParseError::NonUtf8String(context.to_owned()))?;
349 Ok(s.to_owned())
350}
351
352#[doc(hidden)]
359pub fn encode_frame(headers: &[EventStreamHeader], payload: &[u8]) -> Vec<u8> {
360 let mut header_bytes = Vec::new();
361 for h in headers {
362 let name_bytes = h.name.as_bytes();
363 debug_assert!(name_bytes.len() <= u8::MAX as usize);
364 #[allow(clippy::cast_possible_truncation)]
365 let len = name_bytes.len() as u8;
366 header_bytes.push(len);
367 header_bytes.extend_from_slice(name_bytes);
368 match &h.value {
369 EventStreamHeaderValue::Bool(true) => header_bytes.push(TYPE_BOOL_TRUE),
370 EventStreamHeaderValue::Bool(false) => header_bytes.push(TYPE_BOOL_FALSE),
371 EventStreamHeaderValue::Byte(v) => {
372 header_bytes.push(TYPE_BYTE);
373 #[allow(clippy::cast_sign_loss)]
374 header_bytes.push(*v as u8);
375 }
376 EventStreamHeaderValue::Int16(v) => {
377 header_bytes.push(TYPE_INT16);
378 header_bytes.extend_from_slice(&v.to_be_bytes());
379 }
380 EventStreamHeaderValue::Int32(v) => {
381 header_bytes.push(TYPE_INT32);
382 header_bytes.extend_from_slice(&v.to_be_bytes());
383 }
384 EventStreamHeaderValue::Int64(v) => {
385 header_bytes.push(TYPE_INT64);
386 header_bytes.extend_from_slice(&v.to_be_bytes());
387 }
388 EventStreamHeaderValue::Bytes(b) => {
389 header_bytes.push(TYPE_BYTE_ARRAY);
390 #[allow(clippy::cast_possible_truncation)]
391 let len = b.len() as u16;
392 header_bytes.extend_from_slice(&len.to_be_bytes());
393 header_bytes.extend_from_slice(b);
394 }
395 EventStreamHeaderValue::String(s) => {
396 header_bytes.push(TYPE_STRING);
397 let bytes = s.as_bytes();
398 #[allow(clippy::cast_possible_truncation)]
399 let len = bytes.len() as u16;
400 header_bytes.extend_from_slice(&len.to_be_bytes());
401 header_bytes.extend_from_slice(bytes);
402 }
403 EventStreamHeaderValue::Timestamp(v) => {
404 header_bytes.push(TYPE_TIMESTAMP);
405 header_bytes.extend_from_slice(&v.to_be_bytes());
406 }
407 EventStreamHeaderValue::Uuid(u) => {
408 header_bytes.push(TYPE_UUID);
409 header_bytes.extend_from_slice(u);
410 }
411 }
412 }
413 let total_length =
414 u32::try_from(PRELUDE_LEN + header_bytes.len() + payload.len() + MESSAGE_CRC_LEN)
415 .expect("frame fits in u32");
416 let headers_length = u32::try_from(header_bytes.len()).expect("headers fit in u32");
417
418 let mut out = Vec::with_capacity(total_length as usize);
419 out.extend_from_slice(&total_length.to_be_bytes());
420 out.extend_from_slice(&headers_length.to_be_bytes());
421 let prelude_crc = crc32fast::hash(&out[..8]);
422 out.extend_from_slice(&prelude_crc.to_be_bytes());
423 out.extend_from_slice(&header_bytes);
424 out.extend_from_slice(payload);
425 let message_crc = crc32fast::hash(&out);
426 out.extend_from_slice(&message_crc.to_be_bytes());
427 out
428}