use bytes::Bytes;
pub trait SseEventParser: Send + Sync {
fn parse_data(&self, data: &str) -> Option<String>;
fn is_done(&self, data: &str) -> bool;
}
pub struct SseParser<P: SseEventParser> {
buffer: String,
parser: P,
}
impl<P: SseEventParser> SseParser<P> {
pub fn new(parser: P) -> Self {
Self {
buffer: String::new(),
parser,
}
}
pub fn parse_chunk(&mut self, bytes: &Bytes) -> Vec<String> {
let mut results = Vec::new();
if let Ok(text) = std::str::from_utf8(bytes) {
self.buffer.push_str(text);
} else {
return results;
}
while let Some(newline_pos) = self.buffer.find('\n') {
let line = self.buffer[..newline_pos].trim().to_string();
self.buffer = self.buffer[newline_pos + 1..].to_string();
if line.is_empty() {
continue;
}
if line.starts_with("event:") {
continue;
}
if let Some(data) = line.strip_prefix("data: ") {
if self.parser.is_done(data) {
continue;
}
if let Some(text) = self.parser.parse_data(data)
&& !text.is_empty()
{
results.push(text);
}
}
}
results
}
}
pub struct AnthropicEventParser;
impl SseEventParser for AnthropicEventParser {
fn parse_data(&self, data: &str) -> Option<String> {
let json: serde_json::Value = serde_json::from_str(data).ok()?;
if json.get("type")?.as_str()? != "content_block_delta" {
return None;
}
json.get("delta")?
.get("text")?
.as_str()
.map(|s| s.to_string())
}
fn is_done(&self, data: &str) -> bool {
data == "[DONE]"
}
}
pub struct OpenAiEventParser;
impl SseEventParser for OpenAiEventParser {
fn parse_data(&self, data: &str) -> Option<String> {
let json: serde_json::Value = serde_json::from_str(data).ok()?;
json.get("choices")?
.get(0)?
.get("delta")?
.get("content")?
.as_str()
.map(|s| s.to_string())
}
fn is_done(&self, data: &str) -> bool {
data == "[DONE]"
}
}
pub struct GeminiEventParser;
impl SseEventParser for GeminiEventParser {
fn parse_data(&self, data: &str) -> Option<String> {
let json: serde_json::Value = serde_json::from_str(data).ok()?;
json.get("candidates")?
.get(0)?
.get("content")?
.get("parts")?
.get(0)?
.get("text")?
.as_str()
.map(|s| s.to_string())
}
fn is_done(&self, data: &str) -> bool {
data.is_empty()
}
}
#[cfg(test)]
#[path = "sse_tests.rs"]
mod sse_tests;