use crate::error::LlmConnectorError;
use futures_util::{Stream, StreamExt};
use std::pin::Pin;
#[inline]
pub fn sse_events(
response: reqwest::Response,
) -> Pin<Box<dyn Stream<Item = Result<String, LlmConnectorError>> + Send>> {
let stream = response.bytes_stream();
let events_stream = stream
.scan(String::new(), move |buffer, chunk_result| {
let mut out: Vec<Result<String, LlmConnectorError>> = Vec::new();
match chunk_result {
Ok(chunk) => {
let chunk_str = String::from_utf8_lossy(&chunk).replace("\r\n", "\n");
buffer.push_str(&chunk_str);
while let Some(boundary_idx) = buffer.find("\n\n") {
let event_str: String = buffer.drain(..boundary_idx + 2).collect();
let mut data_lines: Vec<String> = Vec::new();
for raw_line in event_str.split('\n') {
let line = raw_line.trim_end();
if let Some(rest) = line
.strip_prefix("data: ")
.or_else(|| line.strip_prefix("data:"))
{
let payload = rest.trim_start();
if payload.trim() == "[DONE]" {
continue;
}
if !payload.is_empty() {
data_lines.push(payload.to_string());
}
}
}
if !data_lines.is_empty() {
out.push(Ok(data_lines.join("\n")));
}
}
}
Err(e) => {
out.push(Err(LlmConnectorError::NetworkError(
e.to_string(),
)));
}
}
std::future::ready(Some(out))
})
.flat_map(futures_util::stream::iter);
Box::pin(events_stream)
}
#[inline]
pub fn json_lines_events(
response: reqwest::Response,
) -> Pin<Box<dyn Stream<Item = Result<String, LlmConnectorError>> + Send>> {
let stream = response.bytes_stream();
let events_stream = stream
.scan(String::new(), move |buffer, chunk_result| {
let mut out: Vec<Result<String, LlmConnectorError>> = Vec::new();
match chunk_result {
Ok(chunk) => {
let chunk_str = String::from_utf8_lossy(&chunk).replace("\r\n", "\n");
buffer.push_str(&chunk_str);
while let Some(boundary_idx) = buffer.find('\n') {
let line: String = buffer.drain(..boundary_idx + 1).collect();
let trimmed = line.trim();
if trimmed.is_empty() { continue; }
if trimmed == "[DONE]" { continue; }
out.push(Ok(trimmed.to_string()));
}
}
Err(e) => {
out.push(Err(LlmConnectorError::NetworkError(e.to_string())));
}
}
std::future::ready(Some(out))
})
.flat_map(futures_util::stream::iter);
Box::pin(events_stream)
}