use serde_json::Value;
const HEADER_TYPE_STRING: u8 = 0x07;
fn crc32(data: &[u8]) -> u32 {
let mut crc: u32 = 0xffff_ffff;
for &byte in data {
let mut b = byte as u32;
for _ in 0..8 {
let bit = (crc ^ b) & 1;
crc >>= 1;
if bit == 1 {
crc ^= 0xedb8_8320;
}
b >>= 1;
}
}
!crc
}
pub struct EventHeader {
pub name: String,
pub value: String,
}
pub fn append_message(out: &mut Vec<u8>, headers: &[EventHeader], payload: &[u8]) {
let mut hb: Vec<u8> = Vec::new();
for h in headers {
debug_assert!(h.name.len() <= u8::MAX as usize, "header name too long");
debug_assert!(h.value.len() <= u16::MAX as usize, "header value too long");
hb.push(h.name.len() as u8);
hb.extend_from_slice(h.name.as_bytes());
hb.push(HEADER_TYPE_STRING);
hb.extend_from_slice(&(h.value.len() as u16).to_be_bytes());
hb.extend_from_slice(h.value.as_bytes());
}
let headers_len = hb.len() as u32;
let total_len = 4u32 + 4 + 4 + headers_len + payload.len() as u32 + 4;
let start = out.len();
out.extend_from_slice(&total_len.to_be_bytes());
out.extend_from_slice(&headers_len.to_be_bytes());
let prelude_crc = crc32(&out[start..start + 8]);
out.extend_from_slice(&prelude_crc.to_be_bytes());
out.extend_from_slice(&hb);
out.extend_from_slice(payload);
let msg_crc = crc32(&out[start..]);
out.extend_from_slice(&msg_crc.to_be_bytes());
}
pub const MARKER: &str = "__awsim_eventstream__";
pub fn try_encode(value: &Value) -> Option<Vec<u8>> {
let frames = value.as_object()?.get(MARKER)?.as_array()?;
let mut out: Vec<u8> = Vec::new();
for frame in frames {
let Some(obj) = frame.as_object() else {
continue;
};
let mut headers: Vec<EventHeader> = Vec::new();
if let Some(hmap) = obj.get("headers").and_then(Value::as_object) {
for (k, v) in hmap {
if let Some(s) = v.as_str() {
headers.push(EventHeader {
name: k.clone(),
value: s.to_string(),
});
}
}
}
let payload_bytes = obj
.get("payload")
.map(|p| serde_json::to_vec(p).unwrap_or_default())
.unwrap_or_default();
append_message(&mut out, &headers, &payload_bytes);
}
Some(out)
}
#[cfg(test)]
mod tests {
use super::*;
use serde_json::json;
#[test]
fn crc32_known_vectors() {
assert_eq!(crc32(b"abc"), 0x3524_41C2);
assert_eq!(crc32(b""), 0);
}
#[test]
fn round_trip_one_message() {
let value = json!({
MARKER: [
{
"headers": {
":event-type": "messageStart",
":message-type": "event",
":content-type": "application/json",
},
"payload": {"role": "assistant"}
}
]
});
let bytes = try_encode(&value).unwrap();
assert!(bytes.len() > 20);
let total_len = u32::from_be_bytes([bytes[0], bytes[1], bytes[2], bytes[3]]);
assert_eq!(total_len as usize, bytes.len());
}
#[test]
fn no_marker_returns_none() {
let value = json!({ "foo": "bar" });
assert!(try_encode(&value).is_none());
}
}