Skip to main content

entelix_cloud/bedrock/
event_stream.rs

1//! AWS `vnd.amazon.eventstream` binary frame decoder.
2//!
3//! Wire format (all integers big-endian):
4//!
5//! ```text
6//! ┌───────────────────────┐
7//! │ total_length      u32 │  full message bytes incl. length fields
8//! │ headers_length    u32 │  header section bytes
9//! │ prelude_crc32     u32 │  CRC-32 over the two length fields above
10//! ├───────────────────────┤
11//! │ headers...            │  zero or more typed headers
12//! │ payload...            │  arbitrary bytes
13//! ├───────────────────────┤
14//! │ message_crc32     u32 │  CRC-32 over every byte before this one
15//! └───────────────────────┘
16//! ```
17//!
18//! Each header is `[name_len: u8][name: utf-8][type: u8][value: per-type]`.
19//! Nine value types (0..=9) cover bool / signed integers / bytes /
20//! string / timestamp / uuid — see [`EventStreamHeaderValue`].
21//!
22//! The decoder is incremental: feed bytes via [`EventStreamDecoder::push`]
23//! and pull complete frames with [`EventStreamDecoder::next_frame`].
24//! Implementation is self-contained — no AWS SDK dep.
25
26use std::collections::HashMap;
27
28use bytes::{Buf, BytesMut};
29use thiserror::Error;
30
31const PRELUDE_LEN: usize = 12;
32const MESSAGE_CRC_LEN: usize = 4;
33const MIN_FRAME_LEN: usize = PRELUDE_LEN + MESSAGE_CRC_LEN;
34
35const TYPE_BOOL_TRUE: u8 = 0;
36const TYPE_BOOL_FALSE: u8 = 1;
37const TYPE_BYTE: u8 = 2;
38const TYPE_INT16: u8 = 3;
39const TYPE_INT32: u8 = 4;
40const TYPE_INT64: u8 = 5;
41const TYPE_BYTE_ARRAY: u8 = 6;
42const TYPE_STRING: u8 = 7;
43const TYPE_TIMESTAMP: u8 = 8;
44const TYPE_UUID: u8 = 9;
45
46/// One typed header value.
47#[derive(Clone, Debug, PartialEq, Eq)]
48#[non_exhaustive]
49pub enum EventStreamHeaderValue {
50    /// `bool` — wire types 0 (true) and 1 (false).
51    Bool(bool),
52    /// `i8` — wire type 2.
53    Byte(i8),
54    /// `i16` big-endian — wire type 3.
55    Int16(i16),
56    /// `i32` big-endian — wire type 4.
57    Int32(i32),
58    /// `i64` big-endian — wire type 5.
59    Int64(i64),
60    /// Variable-length byte array (u16 length prefix) — wire type 6.
61    Bytes(Vec<u8>),
62    /// UTF-8 string (u16 length prefix) — wire type 7.
63    String(String),
64    /// Milliseconds since the Unix epoch (`i64` BE) — wire type 8.
65    Timestamp(i64),
66    /// 16-byte UUID — wire type 9.
67    Uuid([u8; 16]),
68}
69
70/// One header `(name, value)` pair.
71#[derive(Clone, Debug, PartialEq, Eq)]
72pub struct EventStreamHeader {
73    /// Header name — UTF-8, max 255 bytes per AWS spec.
74    pub name: String,
75    /// Typed value.
76    pub value: EventStreamHeaderValue,
77}
78
79/// One decoded frame.
80#[derive(Clone, Debug, PartialEq, Eq)]
81pub struct EventStreamFrame {
82    /// Headers in the order they appeared on the wire.
83    pub headers: Vec<EventStreamHeader>,
84    /// Header lookup by name — O(1) access for callers that only
85    /// care about specific keys.
86    pub header_index: HashMap<String, EventStreamHeaderValue>,
87    /// Payload bytes (typically a JSON event for Bedrock streams).
88    pub payload: Vec<u8>,
89}
90
91impl EventStreamFrame {
92    /// Convenience: pull the `:event-type` header value as a string,
93    /// matching AWS service convention.
94    pub fn event_type(&self) -> Option<&str> {
95        match self.header_index.get(":event-type") {
96            Some(EventStreamHeaderValue::String(s)) => Some(s),
97            _ => None,
98        }
99    }
100}
101
102/// Errors from frame parsing.
103#[derive(Debug, Error)]
104#[non_exhaustive]
105pub enum EventStreamParseError {
106    /// `total_length` field claims a frame size below the protocol
107    /// minimum.
108    #[error("frame too short: total_length={0} (min {MIN_FRAME_LEN})")]
109    FrameTooShort(u32),
110
111    /// CRC over the prelude (first 8 bytes) did not match the
112    /// stored value.
113    #[error("prelude CRC mismatch (computed {computed:#010x}, header {expected:#010x})")]
114    PreludeCrcMismatch {
115        /// CRC value the decoder computed locally.
116        computed: u32,
117        /// CRC value carried in the prelude.
118        expected: u32,
119    },
120
121    /// CRC over the full message did not match the trailing value.
122    #[error("message CRC mismatch (computed {computed:#010x}, trailer {expected:#010x})")]
123    MessageCrcMismatch {
124        /// CRC value the decoder computed locally.
125        computed: u32,
126        /// CRC value carried in the message trailer.
127        expected: u32,
128    },
129
130    /// A header value type byte was outside the documented `0..=9`
131    /// range.
132    #[error("unknown header value type: {0}")]
133    UnknownHeaderType(u8),
134
135    /// A length-prefixed field claimed more bytes than the frame
136    /// supplied.
137    #[error("frame underrun reading {context}")]
138    Underrun {
139        /// Human-readable site that ran out of bytes (e.g. "string
140        /// value", "header name").
141        context: &'static str,
142    },
143
144    /// A `String` header could not be decoded as UTF-8.
145    #[error("non-UTF-8 string in header {0}")]
146    NonUtf8String(String),
147}
148
149/// Incremental binary frame decoder.
150#[derive(Default)]
151pub struct EventStreamDecoder {
152    buf: BytesMut,
153}
154
155impl EventStreamDecoder {
156    /// Empty decoder.
157    pub fn new() -> Self {
158        Self::default()
159    }
160
161    /// Append `chunk` to the internal buffer.
162    pub fn push(&mut self, chunk: &[u8]) {
163        self.buf.extend_from_slice(chunk);
164    }
165
166    /// Try to parse the next complete frame. Returns `Ok(None)` when
167    /// the buffer does not yet contain a full frame, `Ok(Some(_))` on
168    /// success, or `Err(_)` when the next frame's prelude is
169    /// malformed (in which case the decoder's state is corrupted —
170    /// callers should treat the stream as failed).
171    pub fn next_frame(
172        &mut self,
173    ) -> std::result::Result<Option<EventStreamFrame>, EventStreamParseError> {
174        if self.buf.len() < PRELUDE_LEN {
175            return Ok(None);
176        }
177        let total_length = u32::from_be_bytes([self.buf[0], self.buf[1], self.buf[2], self.buf[3]]);
178        let total_usize = total_length as usize;
179        if total_usize < MIN_FRAME_LEN {
180            return Err(EventStreamParseError::FrameTooShort(total_length));
181        }
182        if self.buf.len() < total_usize {
183            return Ok(None);
184        }
185        let headers_length =
186            u32::from_be_bytes([self.buf[4], self.buf[5], self.buf[6], self.buf[7]]) as usize;
187        let prelude_crc_expected =
188            u32::from_be_bytes([self.buf[8], self.buf[9], self.buf[10], self.buf[11]]);
189        let prelude_crc_computed = crc32fast::hash(&self.buf[..8]);
190        if prelude_crc_computed != prelude_crc_expected {
191            return Err(EventStreamParseError::PreludeCrcMismatch {
192                computed: prelude_crc_computed,
193                expected: prelude_crc_expected,
194            });
195        }
196        // Verify message CRC over [0, total - 4).
197        let message_body_end = total_usize - MESSAGE_CRC_LEN;
198        let message_crc_expected = u32::from_be_bytes([
199            self.buf[message_body_end],
200            self.buf[message_body_end + 1],
201            self.buf[message_body_end + 2],
202            self.buf[message_body_end + 3],
203        ]);
204        let message_crc_computed = crc32fast::hash(&self.buf[..message_body_end]);
205        if message_crc_computed != message_crc_expected {
206            return Err(EventStreamParseError::MessageCrcMismatch {
207                computed: message_crc_computed,
208                expected: message_crc_expected,
209            });
210        }
211        // Slice headers and payload.
212        let header_start = PRELUDE_LEN;
213        let header_end = header_start + headers_length;
214        let payload_start = header_end;
215        let payload_end = message_body_end;
216
217        let headers = parse_headers(&self.buf[header_start..header_end])?;
218        let payload = self.buf[payload_start..payload_end].to_vec();
219        let mut header_index = HashMap::with_capacity(headers.len());
220        for h in &headers {
221            header_index.insert(h.name.clone(), h.value.clone());
222        }
223        // Drop the consumed frame from the buffer.
224        self.buf.advance(total_usize);
225        Ok(Some(EventStreamFrame {
226            headers,
227            header_index,
228            payload,
229        }))
230    }
231
232    /// True when the internal buffer has bytes that have not yet
233    /// completed a frame.
234    pub fn has_residual(&self) -> bool {
235        !self.buf.is_empty()
236    }
237}
238
239fn parse_headers(
240    mut bytes: &[u8],
241) -> std::result::Result<Vec<EventStreamHeader>, EventStreamParseError> {
242    let mut out = Vec::new();
243    while !bytes.is_empty() {
244        let name_len = take_u8(&mut bytes, "header name length")? as usize;
245        let name = take_str(&mut bytes, name_len, "header name")?;
246        let type_byte = take_u8(&mut bytes, "header type")?;
247        let value = parse_header_value(&mut bytes, type_byte, &name)?;
248        out.push(EventStreamHeader { name, value });
249    }
250    Ok(out)
251}
252
253fn parse_header_value(
254    bytes: &mut &[u8],
255    type_byte: u8,
256    header_name: &str,
257) -> std::result::Result<EventStreamHeaderValue, EventStreamParseError> {
258    match type_byte {
259        TYPE_BOOL_TRUE => Ok(EventStreamHeaderValue::Bool(true)),
260        TYPE_BOOL_FALSE => Ok(EventStreamHeaderValue::Bool(false)),
261        TYPE_BYTE => {
262            let v = take_u8(bytes, "byte value")? as i8;
263            Ok(EventStreamHeaderValue::Byte(v))
264        }
265        TYPE_INT16 => {
266            let v = take_n::<2>(bytes, "int16 value")?;
267            Ok(EventStreamHeaderValue::Int16(i16::from_be_bytes(v)))
268        }
269        TYPE_INT32 => {
270            let v = take_n::<4>(bytes, "int32 value")?;
271            Ok(EventStreamHeaderValue::Int32(i32::from_be_bytes(v)))
272        }
273        TYPE_INT64 => {
274            let v = take_n::<8>(bytes, "int64 value")?;
275            Ok(EventStreamHeaderValue::Int64(i64::from_be_bytes(v)))
276        }
277        TYPE_BYTE_ARRAY => {
278            let len_bytes = take_n::<2>(bytes, "byte array length")?;
279            let len = u16::from_be_bytes(len_bytes) as usize;
280            let payload = take_slice(bytes, len, "byte array value")?;
281            Ok(EventStreamHeaderValue::Bytes(payload.to_vec()))
282        }
283        TYPE_STRING => {
284            let len_bytes = take_n::<2>(bytes, "string length")?;
285            let len = u16::from_be_bytes(len_bytes) as usize;
286            let payload = take_slice(bytes, len, "string value")?;
287            let s = std::str::from_utf8(payload)
288                .map_err(|_| EventStreamParseError::NonUtf8String(header_name.to_owned()))?;
289            Ok(EventStreamHeaderValue::String(s.to_owned()))
290        }
291        TYPE_TIMESTAMP => {
292            let v = take_n::<8>(bytes, "timestamp value")?;
293            Ok(EventStreamHeaderValue::Timestamp(i64::from_be_bytes(v)))
294        }
295        TYPE_UUID => {
296            let v = take_n::<16>(bytes, "uuid value")?;
297            Ok(EventStreamHeaderValue::Uuid(v))
298        }
299        other => Err(EventStreamParseError::UnknownHeaderType(other)),
300    }
301}
302
303fn take_u8(
304    bytes: &mut &[u8],
305    context: &'static str,
306) -> std::result::Result<u8, EventStreamParseError> {
307    if bytes.is_empty() {
308        return Err(EventStreamParseError::Underrun { context });
309    }
310    let v = bytes[0];
311    *bytes = &bytes[1..];
312    Ok(v)
313}
314
315fn take_n<const N: usize>(
316    bytes: &mut &[u8],
317    context: &'static str,
318) -> std::result::Result<[u8; N], EventStreamParseError> {
319    if bytes.len() < N {
320        return Err(EventStreamParseError::Underrun { context });
321    }
322    let mut out = [0u8; N];
323    out.copy_from_slice(&bytes[..N]);
324    *bytes = &bytes[N..];
325    Ok(out)
326}
327
328fn take_slice<'a>(
329    bytes: &mut &'a [u8],
330    n: usize,
331    context: &'static str,
332) -> std::result::Result<&'a [u8], EventStreamParseError> {
333    if bytes.len() < n {
334        return Err(EventStreamParseError::Underrun { context });
335    }
336    let (head, tail) = bytes.split_at(n);
337    *bytes = tail;
338    Ok(head)
339}
340
341fn take_str(
342    bytes: &mut &[u8],
343    n: usize,
344    context: &'static str,
345) -> std::result::Result<String, EventStreamParseError> {
346    let slice = take_slice(bytes, n, context)?;
347    let s = std::str::from_utf8(slice)
348        .map_err(|_| EventStreamParseError::NonUtf8String(context.to_owned()))?;
349    Ok(s.to_owned())
350}
351
352// ── Frame encoder (test-only helper, kept on the public API for
353// integration tests and downstream tooling) ───────────────────────
354
355/// Encode a single frame into AWS `vnd.amazon.eventstream` bytes.
356/// Used by tests and any caller that needs to mint synthetic frames
357/// for fixtures.
358#[doc(hidden)]
359pub fn encode_frame(headers: &[EventStreamHeader], payload: &[u8]) -> Vec<u8> {
360    let mut header_bytes = Vec::new();
361    for h in headers {
362        let name_bytes = h.name.as_bytes();
363        debug_assert!(name_bytes.len() <= u8::MAX as usize);
364        #[allow(clippy::cast_possible_truncation)]
365        let len = name_bytes.len() as u8;
366        header_bytes.push(len);
367        header_bytes.extend_from_slice(name_bytes);
368        match &h.value {
369            EventStreamHeaderValue::Bool(true) => header_bytes.push(TYPE_BOOL_TRUE),
370            EventStreamHeaderValue::Bool(false) => header_bytes.push(TYPE_BOOL_FALSE),
371            EventStreamHeaderValue::Byte(v) => {
372                header_bytes.push(TYPE_BYTE);
373                #[allow(clippy::cast_sign_loss)]
374                header_bytes.push(*v as u8);
375            }
376            EventStreamHeaderValue::Int16(v) => {
377                header_bytes.push(TYPE_INT16);
378                header_bytes.extend_from_slice(&v.to_be_bytes());
379            }
380            EventStreamHeaderValue::Int32(v) => {
381                header_bytes.push(TYPE_INT32);
382                header_bytes.extend_from_slice(&v.to_be_bytes());
383            }
384            EventStreamHeaderValue::Int64(v) => {
385                header_bytes.push(TYPE_INT64);
386                header_bytes.extend_from_slice(&v.to_be_bytes());
387            }
388            EventStreamHeaderValue::Bytes(b) => {
389                header_bytes.push(TYPE_BYTE_ARRAY);
390                #[allow(clippy::cast_possible_truncation)]
391                let len = b.len() as u16;
392                header_bytes.extend_from_slice(&len.to_be_bytes());
393                header_bytes.extend_from_slice(b);
394            }
395            EventStreamHeaderValue::String(s) => {
396                header_bytes.push(TYPE_STRING);
397                let bytes = s.as_bytes();
398                #[allow(clippy::cast_possible_truncation)]
399                let len = bytes.len() as u16;
400                header_bytes.extend_from_slice(&len.to_be_bytes());
401                header_bytes.extend_from_slice(bytes);
402            }
403            EventStreamHeaderValue::Timestamp(v) => {
404                header_bytes.push(TYPE_TIMESTAMP);
405                header_bytes.extend_from_slice(&v.to_be_bytes());
406            }
407            EventStreamHeaderValue::Uuid(u) => {
408                header_bytes.push(TYPE_UUID);
409                header_bytes.extend_from_slice(u);
410            }
411        }
412    }
413    let total_length =
414        u32::try_from(PRELUDE_LEN + header_bytes.len() + payload.len() + MESSAGE_CRC_LEN)
415            .expect("frame fits in u32");
416    let headers_length = u32::try_from(header_bytes.len()).expect("headers fit in u32");
417
418    let mut out = Vec::with_capacity(total_length as usize);
419    out.extend_from_slice(&total_length.to_be_bytes());
420    out.extend_from_slice(&headers_length.to_be_bytes());
421    let prelude_crc = crc32fast::hash(&out[..8]);
422    out.extend_from_slice(&prelude_crc.to_be_bytes());
423    out.extend_from_slice(&header_bytes);
424    out.extend_from_slice(payload);
425    let message_crc = crc32fast::hash(&out);
426    out.extend_from_slice(&message_crc.to_be_bytes());
427    out
428}