#![cfg(feature = "aws")]
#![allow(
clippy::unwrap_used,
clippy::indexing_slicing,
clippy::cast_possible_truncation,
clippy::cast_sign_loss,
clippy::cast_lossless,
clippy::cast_possible_wrap,
clippy::checked_conversions,
clippy::needless_pass_by_value,
clippy::redundant_clone
)]
use entelix_cloud::bedrock::{
EventStreamDecoder, EventStreamFrame, EventStreamHeader, EventStreamHeaderValue,
EventStreamParseError, encode_frame,
};
fn encode(headers: Vec<EventStreamHeader>, payload: &[u8]) -> Vec<u8> {
encode_frame(&headers, payload)
}
fn header(name: &str, value: EventStreamHeaderValue) -> EventStreamHeader {
EventStreamHeader {
name: name.to_owned(),
value,
}
}
#[test]
fn round_trip_empty_payload_no_headers() {
let bytes = encode(Vec::new(), &[]);
let mut dec = EventStreamDecoder::new();
dec.push(&bytes);
let frame = dec.next_frame().unwrap().unwrap();
assert!(frame.headers.is_empty());
assert!(frame.payload.is_empty());
}
#[test]
fn round_trip_all_nine_header_types() {
let headers = vec![
header("bool-true", EventStreamHeaderValue::Bool(true)),
header("bool-false", EventStreamHeaderValue::Bool(false)),
header("byte", EventStreamHeaderValue::Byte(-7)),
header("int16", EventStreamHeaderValue::Int16(-1234)),
header("int32", EventStreamHeaderValue::Int32(0x1122_3344)),
header(
"int64",
EventStreamHeaderValue::Int64(-0x0102_0304_0506_0708),
),
header(
"bytes",
EventStreamHeaderValue::Bytes(vec![0xde, 0xad, 0xbe, 0xef]),
),
header("string", EventStreamHeaderValue::String("hello".into())),
header("ts", EventStreamHeaderValue::Timestamp(1_700_000_000_000)),
header(
"uuid",
EventStreamHeaderValue::Uuid([
0x01, 0x23, 0x45, 0x67, 0x89, 0xab, 0xcd, 0xef, 0x01, 0x23, 0x45, 0x67, 0x89, 0xab,
0xcd, 0xef,
]),
),
];
let bytes = encode(headers.clone(), b"payload");
let mut dec = EventStreamDecoder::new();
dec.push(&bytes);
let frame = dec.next_frame().unwrap().unwrap();
assert_eq!(frame.headers, headers);
assert_eq!(frame.payload, b"payload");
assert_eq!(
frame.header_index.get("string"),
Some(&EventStreamHeaderValue::String("hello".into()))
);
}
#[test]
fn frame_split_across_chunks_assembles() {
let bytes = encode(
vec![header(
":event-type",
EventStreamHeaderValue::String("contentBlockDelta".into()),
)],
br#"{"delta": "hi"}"#,
);
let mid = bytes.len() / 2;
let mut dec = EventStreamDecoder::new();
dec.push(&bytes[..mid]);
assert!(dec.next_frame().unwrap().is_none());
dec.push(&bytes[mid..]);
let frame = dec.next_frame().unwrap().unwrap();
assert_eq!(frame.event_type(), Some("contentBlockDelta"));
assert_eq!(frame.payload, br#"{"delta": "hi"}"#);
}
#[test]
fn two_back_to_back_frames_decode() {
let frame_a = encode(vec![], b"first");
let frame_b = encode(vec![], b"second");
let mut combined = frame_a.clone();
combined.extend_from_slice(&frame_b);
let mut dec = EventStreamDecoder::new();
dec.push(&combined);
let f1 = dec.next_frame().unwrap().unwrap();
let f2 = dec.next_frame().unwrap().unwrap();
assert_eq!(f1.payload, b"first");
assert_eq!(f2.payload, b"second");
assert!(dec.next_frame().unwrap().is_none());
}
#[test]
fn corrupted_prelude_crc_surfaces_error() {
let mut bytes = encode(vec![], b"x");
bytes[8] ^= 0xff;
let mut dec = EventStreamDecoder::new();
dec.push(&bytes);
let err = dec.next_frame().unwrap_err();
assert!(matches!(
err,
EventStreamParseError::PreludeCrcMismatch { .. }
));
}
#[test]
fn corrupted_message_crc_surfaces_error() {
let mut bytes = encode(vec![], b"x");
let last = bytes.len() - 1;
bytes[last] ^= 0xff;
let mut dec = EventStreamDecoder::new();
dec.push(&bytes);
let err = dec.next_frame().unwrap_err();
assert!(matches!(
err,
EventStreamParseError::MessageCrcMismatch { .. }
));
}
#[test]
fn frame_too_short_in_total_length_field_rejected() {
let mut bytes = Vec::new();
bytes.extend_from_slice(&8u32.to_be_bytes()); bytes.extend_from_slice(&0u32.to_be_bytes()); let prelude_crc = crc32fast::hash(&bytes[..8]);
bytes.extend_from_slice(&prelude_crc.to_be_bytes());
let mut dec = EventStreamDecoder::new();
dec.push(&bytes);
let err = dec.next_frame().unwrap_err();
assert!(matches!(err, EventStreamParseError::FrameTooShort(8)));
}
#[test]
fn unknown_header_type_rejected() {
let header_name = b"x";
let bogus_type: u8 = 200;
let mut header_bytes = Vec::new();
header_bytes.push(header_name.len() as u8);
header_bytes.extend_from_slice(header_name);
header_bytes.push(bogus_type);
let payload = b"";
let total_length = (12 + header_bytes.len() + payload.len() + 4) as u32;
let headers_length = header_bytes.len() as u32;
let mut bytes = Vec::new();
bytes.extend_from_slice(&total_length.to_be_bytes());
bytes.extend_from_slice(&headers_length.to_be_bytes());
let prelude_crc = crc32fast::hash(&bytes[..8]);
bytes.extend_from_slice(&prelude_crc.to_be_bytes());
bytes.extend_from_slice(&header_bytes);
bytes.extend_from_slice(payload);
let message_crc = crc32fast::hash(&bytes);
bytes.extend_from_slice(&message_crc.to_be_bytes());
let mut dec = EventStreamDecoder::new();
dec.push(&bytes);
let err = dec.next_frame().unwrap_err();
assert!(matches!(err, EventStreamParseError::UnknownHeaderType(200)));
}
#[test]
fn event_type_helper_returns_none_when_absent() {
let frame = EventStreamFrame {
headers: Vec::new(),
header_index: std::collections::HashMap::new(),
payload: Vec::new(),
};
assert!(frame.event_type().is_none());
}