use futures::stream::{self, Stream, StreamExt};
use crate::client::ClientError;
pub trait SSEResponseExt {
fn sse(self) -> impl Stream<Item = Result<String, ClientError>> + Send;
}
impl SSEResponseExt for reqwest::Response {
fn sse(self) -> impl Stream<Item = Result<String, ClientError>> + Send {
let byte_stream = self.bytes_stream();
stream::unfold(
(Box::pin(byte_stream), String::new(), false),
|(mut byte_stream, mut buffer, mut stream_ended)| async move {
loop {
if !stream_ended {
match byte_stream.next().await {
Some(Ok(chunk)) => {
if let Ok(s) = std::str::from_utf8(&chunk) {
buffer.push_str(s);
}
}
Some(Err(e)) => {
return Some((
Err(ClientError::from(e)),
(byte_stream, buffer, stream_ended),
));
}
None => {
stream_ended = true;
}
}
}
while let Some(pos) = buffer.find('\n') {
let line = buffer[..pos].trim().to_string();
buffer.drain(..=pos);
if line.is_empty() {
continue;
}
if let Some(data) = parse_sse_line(&line) {
if is_done_marker(data) {
return None;
}
return Some((
Ok(data.to_string()),
(byte_stream, buffer, stream_ended),
));
}
}
if stream_ended {
if !buffer.is_empty() {
let line = buffer.trim().to_string();
buffer.clear();
if !line.is_empty() {
if let Some(data) = parse_sse_line(&line) {
if !is_done_marker(data) {
return Some((
Ok(data.to_string()),
(byte_stream, buffer, stream_ended),
));
}
}
}
}
return None;
}
}
},
)
}
}
pub fn parse_sse_line(line: &str) -> Option<&str> {
line.strip_prefix("data: ").map(|s| s.trim())
}
pub fn is_done_marker(data: &str) -> bool {
data == "[DONE]"
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_parse_sse_line() {
assert_eq!(parse_sse_line("data: hello"), Some("hello"));
assert_eq!(
parse_sse_line("data: {\"key\": \"value\"}"),
Some("{\"key\": \"value\"}")
);
assert_eq!(parse_sse_line("data: spaces "), Some("spaces"));
assert_eq!(parse_sse_line("invalid"), None);
assert_eq!(parse_sse_line(""), None);
}
#[test]
fn test_is_done_marker() {
assert!(is_done_marker("[DONE]"));
assert!(!is_done_marker(""));
assert!(!is_done_marker("data"));
assert!(!is_done_marker("{\"key\": \"value\"}"));
}
}