pub fn drain_sse_events(buf: &mut Vec<u8>) -> Vec<String> {
let mut events = Vec::new();
while let Some(end) = find_event_boundary(buf) {
let raw: Vec<u8> = buf.drain(..end).collect();
let body = strip_trailing_separator(&raw);
let text = String::from_utf8_lossy(body);
let mut data_lines: Vec<&str> = Vec::new();
for line in text.split('\n') {
if line.starts_with(':') || line.is_empty() {
continue;
}
if let Some(value) = line.strip_prefix("data:") {
let value = value.strip_prefix(' ').unwrap_or(value);
data_lines.push(value);
}
}
if data_lines.is_empty() {
continue;
}
let payload = data_lines.join("\n");
if payload == "[DONE]" {
continue;
}
events.push(payload);
}
events
}
fn find_event_boundary(buf: &[u8]) -> Option<usize> {
let lf_lf = find_subsequence(buf, b"\n\n");
let crlf_crlf = find_subsequence(buf, b"\r\n\r\n");
match (lf_lf, crlf_crlf) {
(Some(a), Some(b)) => Some(std::cmp::min(a + 2, b + 4)),
(Some(a), None) => Some(a + 2),
(None, Some(b)) => Some(b + 4),
(None, None) => None,
}
}
fn strip_trailing_separator(raw: &[u8]) -> &[u8] {
if raw.ends_with(b"\r\n\r\n") {
&raw[..raw.len() - 4]
} else if raw.ends_with(b"\n\n") {
&raw[..raw.len() - 2]
} else {
raw
}
}
fn find_subsequence(haystack: &[u8], needle: &[u8]) -> Option<usize> {
haystack
.windows(needle.len())
.position(|window| window == needle)
}
#[cfg(test)]
mod tests {
use super::drain_sse_events;
#[test]
fn empty_buffer_yields_nothing() {
let mut buf: Vec<u8> = Vec::new();
assert!(drain_sse_events(&mut buf).is_empty());
assert!(buf.is_empty());
}
#[test]
fn complete_event_is_drained() {
let mut buf = b"data: hello\n\n".to_vec();
let events = drain_sse_events(&mut buf);
assert_eq!(events, vec!["hello".to_string()]);
assert!(buf.is_empty());
}
#[test]
fn multiple_events_in_one_chunk() {
let mut buf = b"data: one\n\ndata: two\n\ndata: three\n\n".to_vec();
let events = drain_sse_events(&mut buf);
assert_eq!(
events,
vec!["one".to_string(), "two".to_string(), "three".to_string()]
);
assert!(buf.is_empty());
}
#[test]
fn done_sentinel_filtered() {
let mut buf = b"data: real\n\ndata: [DONE]\n\n".to_vec();
let events = drain_sse_events(&mut buf);
assert_eq!(events, vec!["real".to_string()]);
}
#[test]
fn comment_and_event_field_ignored() {
let mut buf = b": this is a heartbeat\n\nevent: chunk\ndata: payload\n\n".to_vec();
let events = drain_sse_events(&mut buf);
assert_eq!(events, vec!["payload".to_string()]);
}
#[test]
fn partial_event_left_in_buffer() {
let mut buf = b"data: complete\n\ndata: partial".to_vec();
let events = drain_sse_events(&mut buf);
assert_eq!(events, vec!["complete".to_string()]);
assert_eq!(buf, b"data: partial");
}
#[test]
fn event_split_across_chunks_reassembles() {
let mut buf: Vec<u8> = Vec::new();
buf.extend_from_slice(b"data: split me");
assert!(drain_sse_events(&mut buf).is_empty());
buf.extend_from_slice(b"\n\n");
let events = drain_sse_events(&mut buf);
assert_eq!(events, vec!["split me".to_string()]);
}
#[test]
fn cjk_payload_split_across_chunks_preserved() {
let mut buf: Vec<u8> = Vec::new();
buf.extend_from_slice(b"data: \xE4\xBD"); assert!(drain_sse_events(&mut buf).is_empty());
buf.extend_from_slice(b"\xA0\xE5\xA5\xBD\n\n"); let events = drain_sse_events(&mut buf);
assert_eq!(events, vec!["你好".to_string()]);
}
#[test]
fn crlf_separator_is_handled() {
let mut buf = b"data: spec-canonical\r\n\r\n".to_vec();
let events = drain_sse_events(&mut buf);
assert_eq!(events, vec!["spec-canonical".to_string()]);
}
#[test]
fn multiline_data_field_joined_with_newline() {
let mut buf = b"data: line one\ndata: line two\n\n".to_vec();
let events = drain_sse_events(&mut buf);
assert_eq!(events, vec!["line one\nline two".to_string()]);
}
}