#[derive(Debug, Clone, PartialEq, Eq)]
pub struct EventFrame {
pub event_type: String,
pub content_type: String,
pub payload: Vec<u8>,
}
pub fn parse_frames(buffer: &mut Vec<u8>) -> Vec<EventFrame> {
let mut frames = Vec::new();
while buffer.len() >= MIN_FRAME_SIZE {
let total_len = read_u32_be(&buffer[0..4]);
if total_len < (PRELUDE_LEN + 4) as u32 {
buffer.drain(..4); continue;
}
if (total_len as usize) > buffer.len() {
break; }
let frame_bytes: Vec<u8> = buffer.drain(..total_len as usize).collect();
if let Some(frame) = parse_single_frame(&frame_bytes) {
frames.push(frame);
}
}
frames
}
const MIN_FRAME_SIZE: usize = 16;
const PRELUDE_LEN: usize = 12;
fn parse_single_frame(data: &[u8]) -> Option<EventFrame> {
if data.len() < PRELUDE_LEN + 4 {
return None;
}
let total_len = read_u32_be(&data[0..4]) as usize;
let headers_len = read_u32_be(&data[4..8]) as usize;
if total_len != data.len() {
return None;
}
let headers_start = PRELUDE_LEN;
let headers_end = headers_start + headers_len;
if headers_end > data.len() - 4 {
return None; }
let headers_data = &data[headers_start..headers_end];
let payload = &data[headers_end..data.len() - 4];
let mut event_type = String::new();
let mut content_type = String::new();
let mut pos = 0;
while pos < headers_data.len() {
let name_len = headers_data[pos] as usize;
pos += 1;
if pos + name_len >= headers_data.len() {
break;
}
let name = std::str::from_utf8(&headers_data[pos..pos + name_len]).unwrap_or("");
pos += name_len;
let header_type = headers_data[pos];
pos += 1;
if header_type == 7 {
if pos + 2 > headers_data.len() {
break;
}
let val_len = read_u16_be(&headers_data[pos..pos + 2]) as usize;
pos += 2;
if pos + val_len > headers_data.len() {
break;
}
let value = std::str::from_utf8(&headers_data[pos..pos + val_len]).unwrap_or("");
pos += val_len;
match name {
":event-type" => event_type = value.to_string(),
":content-type" => content_type = value.to_string(),
_ => {}
}
} else {
pos += skip_header_value_len(header_type, &headers_data[pos..]).unwrap_or(0);
}
}
Some(EventFrame {
event_type,
content_type,
payload: payload.to_vec(),
})
}
fn skip_header_value_len(header_type: u8, rest: &[u8]) -> Option<usize> {
match header_type {
0 | 1 => Some(1), 2 => Some(1), 3 => Some(2), 4 => Some(4), 5 => Some(8), 6 => {
if rest.len() < 2 {
return None;
}
let len = read_u16_be(&rest[0..2]) as usize;
Some(2 + len)
}
8 => Some(8), 9 => Some(1), _ => Some(0),
}
}
fn read_u32_be(bytes: &[u8]) -> u32 {
u32::from_be_bytes([bytes[0], bytes[1], bytes[2], bytes[3]])
}
fn read_u16_be(bytes: &[u8]) -> u16 {
u16::from_be_bytes([bytes[0], bytes[1]])
}
pub fn build_test_frame(event_type: &str, content_type: &str, payload: &[u8]) -> Vec<u8> {
let mut headers = Vec::new();
let event_type_name = b":event-type";
headers.push(event_type_name.len() as u8);
headers.extend_from_slice(event_type_name);
headers.push(7); let et_len = event_type.len() as u16;
headers.extend_from_slice(&et_len.to_be_bytes());
headers.extend_from_slice(event_type.as_bytes());
let ct_name = b":content-type";
headers.push(ct_name.len() as u8);
headers.extend_from_slice(ct_name);
headers.push(7);
let ct_len = content_type.len() as u16;
headers.extend_from_slice(&ct_len.to_be_bytes());
headers.extend_from_slice(content_type.as_bytes());
let headers_len = headers.len() as u32;
let total_len: u32 = 4 + 4 + 4 + headers_len + payload.len() as u32 + 4;
let mut frame = Vec::with_capacity(total_len as usize);
frame.extend_from_slice(&total_len.to_be_bytes());
frame.extend_from_slice(&headers_len.to_be_bytes());
frame.extend_from_slice(&0u32.to_be_bytes());
frame.extend_from_slice(&headers);
frame.extend_from_slice(payload);
frame.extend_from_slice(&0u32.to_be_bytes());
frame
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn parse_single_message_start_frame() {
let payload = br#"{"role":"assistant"}"#;
let frame_bytes = build_test_frame("messageStart", "application/json", payload);
let mut buffer = frame_bytes.clone();
let frames = parse_frames(&mut buffer);
assert_eq!(frames.len(), 1);
assert_eq!(frames[0].event_type, "messageStart");
assert_eq!(frames[0].content_type, "application/json");
assert_eq!(frames[0].payload, payload);
assert!(buffer.is_empty());
}
#[test]
fn parse_multiple_frames_from_buffer() {
let f1 = build_test_frame(
"messageStart",
"application/json",
br#"{"role":"assistant"}"#,
);
let f2 = build_test_frame(
"contentBlockDelta",
"application/json",
br#"{"delta":{"text":"Hi"}}"#,
);
let f3 = build_test_frame(
"messageStop",
"application/json",
br#"{"stopReason":"end_turn"}"#,
);
let mut buffer = Vec::new();
buffer.extend_from_slice(&f1);
buffer.extend_from_slice(&f2);
buffer.extend_from_slice(&f3);
let frames = parse_frames(&mut buffer);
assert_eq!(frames.len(), 3);
assert_eq!(frames[0].event_type, "messageStart");
assert_eq!(frames[1].event_type, "contentBlockDelta");
assert_eq!(frames[2].event_type, "messageStop");
assert!(buffer.is_empty());
}
#[test]
fn incomplete_frame_stays_in_buffer() {
let frame = build_test_frame("messageStart", "application/json", b"{}");
let mut buffer = frame[..frame.len() - 5].to_vec();
let frames = parse_frames(&mut buffer);
assert!(frames.is_empty());
assert!(!buffer.is_empty());
}
#[test]
fn second_chunk_completes_frame() {
let frame = build_test_frame("messageStart", "application/json", b"{}");
let split = frame.len() / 2;
let mut buffer = frame[..split].to_vec();
let mut frames = parse_frames(&mut buffer);
assert!(frames.is_empty());
buffer.extend_from_slice(&frame[split..]);
frames.extend(parse_frames(&mut buffer));
assert_eq!(frames.len(), 1);
assert!(buffer.is_empty());
}
#[test]
fn empty_buffer_returns_no_frames() {
let mut buffer = Vec::new();
let frames = parse_frames(&mut buffer);
assert!(frames.is_empty());
}
#[test]
fn frame_with_empty_payload() {
let frame = build_test_frame("contentBlockStop", "application/json", b"");
let mut buffer = frame;
let frames = parse_frames(&mut buffer);
assert_eq!(frames.len(), 1);
assert!(frames[0].payload.is_empty());
}
}