pub(super) struct SseLineBuffer {
buf: Vec<u8>,
pos: usize,
}
impl SseLineBuffer {
pub(super) fn new() -> Self {
Self { buf: Vec::with_capacity(8192), pos: 0 }
}
pub(super) fn extend(&mut self, chunk: &[u8]) {
if self.pos > 0 && self.pos == self.buf.len() {
self.buf.clear();
self.pos = 0;
} else if self.pos > 4096 {
self.buf.drain(..self.pos);
self.pos = 0;
}
self.buf.extend_from_slice(chunk);
}
pub(super) fn next_line(&mut self) -> Option<&str> {
let start = self.pos;
let rel = memchr::memchr(b'\n', &self.buf[start..])?;
let end = start + rel;
self.pos = end + 1;
match std::str::from_utf8(&self.buf[start..end]) {
Ok(s) => Some(s.trim_end()),
Err(_) => Some(""), }
}
pub(super) fn take_remaining(&mut self) -> Option<String> {
if self.pos >= self.buf.len() {
return None;
}
let rest = String::from_utf8_lossy(&self.buf[self.pos..]).trim().to_string();
self.buf.clear();
self.pos = 0;
if rest.is_empty() { None } else { Some(rest) }
}
}
#[cfg(test)]
mod tests {
use super::*;
fn lines_with_chunking(input: &[u8], split_at: usize) -> Vec<String> {
let mut buf = SseLineBuffer::new();
let mut out = Vec::new();
let (a, b) = input.split_at(split_at.min(input.len()));
for chunk in [a, b] {
if chunk.is_empty() { continue; }
buf.extend(chunk);
while let Some(line) = buf.next_line() {
out.push(line.to_string());
}
}
if let Some(rest) = buf.take_remaining() {
out.push(rest);
}
out
}
#[test]
fn whole_lines_pass_through() {
let mut buf = SseLineBuffer::new();
buf.extend(b"data: {\"a\":1}\ndata: {\"b\":2}\n");
assert_eq!(buf.next_line(), Some("data: {\"a\":1}"));
assert_eq!(buf.next_line(), Some("data: {\"b\":2}"));
assert_eq!(buf.next_line(), None);
}
#[test]
fn line_split_across_chunks() {
let mut buf = SseLineBuffer::new();
buf.extend(b"data: {\"par");
assert_eq!(buf.next_line(), None, "no complete line yet");
buf.extend(b"tial\":true}\n");
assert_eq!(buf.next_line(), Some("data: {\"partial\":true}"));
}
#[test]
fn utf8_codepoint_split_across_chunks() {
let input = "data: {\"t\":\"…\"}\n".as_bytes();
for split in 0..input.len() {
let lines = lines_with_chunking(input, split);
assert_eq!(lines, vec!["data: {\"t\":\"…\"}"], "split at {split}");
}
}
#[test]
fn crlf_line_endings_trimmed() {
let mut buf = SseLineBuffer::new();
buf.extend(b"data: x\r\ndata: y\r\n");
assert_eq!(buf.next_line(), Some("data: x"));
assert_eq!(buf.next_line(), Some("data: y"));
}
#[test]
fn empty_lines_and_comments() {
let mut buf = SseLineBuffer::new();
buf.extend(b"\n: keepalive\n\ndata: real\n");
assert_eq!(buf.next_line(), Some(""));
assert_eq!(buf.next_line(), Some(": keepalive"));
assert_eq!(buf.next_line(), Some(""));
assert_eq!(buf.next_line(), Some("data: real"));
}
#[test]
fn remaining_partial_line_at_stream_end() {
let mut buf = SseLineBuffer::new();
buf.extend(b"data: complete\ndata: no-newline");
assert_eq!(buf.next_line(), Some("data: complete"));
assert_eq!(buf.next_line(), None);
assert_eq!(buf.take_remaining(), Some("data: no-newline".to_string()));
assert_eq!(buf.take_remaining(), None, "second take is empty");
}
#[test]
fn exhaustive_rechunking_preserves_event_stream() {
let input = "event: message\ndata: {\"text\":\"héllo ✨\"}\r\n: ping\ndata: [DONE]\n".as_bytes();
let baseline = lines_with_chunking(input, input.len());
for split in 1..input.len() {
assert_eq!(lines_with_chunking(input, split), baseline, "split at {split}");
}
}
#[test]
fn buffer_compaction_keeps_unread_bytes() {
let mut buf = SseLineBuffer::new();
for _ in 0..600 {
buf.extend(b"data: x\n");
assert_eq!(buf.next_line(), Some("data: x"));
}
buf.extend(b"data: tail-");
buf.extend(b"end\n");
assert_eq!(buf.next_line(), Some("data: tail-end"));
}
}