use anyhow::Result;
#[derive(Debug, Default)]
pub struct SseParser {
buffer: Vec<u8>,
provider: Option<String>,
model: Option<String>,
}
impl SseParser {
#[must_use]
pub fn new() -> Self {
Self::default()
}
#[must_use]
pub fn with_context(mut self, provider: impl Into<String>, model: impl Into<String>) -> Self {
self.provider = Some(provider.into());
self.model = Some(model.into());
self
}
pub fn push(&mut self, chunk: &[u8]) -> Result<Vec<SseEvent>> {
self.buffer.extend_from_slice(chunk);
let mut events = Vec::new();
while let Some(frame) = self.next_frame() {
if let Some(event) = self.parse_frame(&frame)? {
events.push(event);
}
}
Ok(events)
}
pub fn finish(&mut self) -> Result<Vec<SseEvent>> {
if self.buffer.is_empty() {
return Ok(Vec::new());
}
let trailing = std::mem::take(&mut self.buffer);
match self.parse_frame(&String::from_utf8_lossy(&trailing))? {
Some(event) => Ok(vec![event]),
None => Ok(Vec::new()),
}
}
fn next_frame(&mut self) -> Option<String> {
let separator = self
.buffer
.windows(2)
.position(|window| window == b"\n\n")
.map(|position| (position, 2))
.or_else(|| {
self.buffer
.windows(4)
.position(|window| window == b"\r\n\r\n")
.map(|position| (position, 4))
})?;
let (position, separator_len) = separator;
let frame = self
.buffer
.drain(..position + separator_len)
.collect::<Vec<_>>();
let frame_len = frame.len().saturating_sub(separator_len);
Some(String::from_utf8_lossy(&frame[..frame_len]).into_owned())
}
fn parse_frame(&self, frame: &str) -> Result<Option<SseEvent>> {
let trimmed = frame.trim();
if trimmed.is_empty() {
return Ok(None);
}
let mut data_lines = Vec::new();
let mut event_name: Option<String> = None;
for line in trimmed.lines() {
if line.starts_with(':') {
continue;
}
if let Some(name) = line.strip_prefix("event:") {
event_name = Some(name.trim().to_string());
continue;
}
if let Some(data) = line.strip_prefix("data:") {
data_lines.push(data.trim_start().to_string());
}
}
if matches!(event_name.as_deref(), Some("ping")) {
return Ok(None);
}
if data_lines.is_empty() {
return Ok(None);
}
let payload = data_lines.join("\n");
if payload == "[DONE]" {
return Ok(None);
}
Ok(Some(SseEvent {
event: event_name,
data: payload,
}))
}
}
#[derive(Debug, Clone)]
pub struct SseEvent {
pub event: Option<String>,
pub data: String,
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn sse_parser_parses_single_frame() {
let frame = concat!(
"event: content_block_start\n",
"data: {\"type\":\"content_block_start\",\"index\":0,\"content_block\":{\"type\":\"text\",\"text\":\"\"}}\n\n"
);
let mut parser = SseParser::new();
let events = parser.push(frame.as_bytes()).expect("frame should parse");
assert_eq!(events.len(), 1);
assert_eq!(events[0].event, Some("content_block_start".to_string()));
assert!(events[0].data.contains("content_block_start"));
}
#[test]
fn sse_parser_handles_chunked_stream() {
let mut parser = SseParser::new();
let first = b"event: content_block_delta\ndata: {\"type\":\"content_block_delta\",\"index\":0,\"delta\":{\"type\":\"text_delta\",\"text\":\"Hel";
let second = b"lo\"}}\n\n";
assert!(parser
.push(first)
.expect("first chunk should buffer")
.is_empty());
let events = parser.push(second).expect("second chunk should parse");
assert_eq!(events.len(), 1);
assert!(events[0].data.contains("Hello"));
}
#[test]
fn sse_parser_ignores_ping_and_done() {
let mut parser = SseParser::new();
let payload = concat!(
": keepalive\n",
"event: ping\n",
"data: {\"type\":\"ping\"}\n\n",
"event: message_delta\n",
"data: {\"type\":\"message_delta\",\"delta\":{\"stop_reason\":\"end_turn\"}}\n\n",
"data: [DONE]\n\n"
);
let events = parser
.push(payload.as_bytes())
.expect("parser should succeed");
assert_eq!(events.len(), 1); }
}