use crate::error::LlmConnectorError;
use futures_util::{Stream, StreamExt};
use std::pin::Pin;
#[inline]
pub fn sse_events(
response: reqwest::Response,
) -> Pin<Box<dyn Stream<Item = Result<String, LlmConnectorError>> + Send>> {
let stream = response.bytes_stream();
let events_stream = stream
.scan(String::new(), move |buffer, chunk_result| {
let mut out: Vec<Result<String, LlmConnectorError>> = Vec::new();
match chunk_result {
Ok(chunk) => {
let chunk_str = String::from_utf8_lossy(&chunk).replace("\r\n", "\n");
buffer.push_str(&chunk_str);
while let Some(boundary_idx) = buffer.find("\n\n") {
let event_str: String = buffer.drain(..boundary_idx + 2).collect();
let mut data_lines: Vec<String> = Vec::new();
for raw_line in event_str.split('\n') {
let line = raw_line.trim_end();
if let Some(rest) = line
.strip_prefix("data: ")
.or_else(|| line.strip_prefix("data:"))
{
let payload = rest.trim_start();
if payload.trim() == "[DONE]" {
continue;
}
if !payload.is_empty() {
data_lines.push(payload.to_string());
}
}
}
if !data_lines.is_empty() {
out.push(Ok(data_lines.join("\n")));
}
}
}
Err(e) => {
out.push(Err(LlmConnectorError::NetworkError(e.to_string())));
}
}
std::future::ready(Some(out))
})
.flat_map(futures_util::stream::iter);
Box::pin(events_stream)
}
#[inline]
pub fn json_lines_events(
response: reqwest::Response,
) -> Pin<Box<dyn Stream<Item = Result<String, LlmConnectorError>> + Send>> {
let stream = response.bytes_stream();
let events_stream = stream
.scan(String::new(), move |buffer, chunk_result| {
let mut out: Vec<Result<String, LlmConnectorError>> = Vec::new();
match chunk_result {
Ok(chunk) => {
let chunk_str = String::from_utf8_lossy(&chunk).replace("\r\n", "\n");
buffer.push_str(&chunk_str);
while let Some(boundary_idx) = buffer.find('\n') {
let line: String = buffer.drain(..boundary_idx + 1).collect();
let trimmed = line.trim();
if trimmed.is_empty() {
continue;
}
if trimmed == "[DONE]" {
continue;
}
out.push(Ok(trimmed.to_string()));
}
}
Err(e) => {
out.push(Err(LlmConnectorError::NetworkError(e.to_string())));
}
}
std::future::ready(Some(out))
})
.flat_map(futures_util::stream::iter);
Box::pin(events_stream)
}
#[cfg(feature = "streaming")]
pub fn sse_to_streaming_response(response: reqwest::Response) -> crate::types::ChatStream {
use crate::types::{StreamingResponse, ToolCall};
use futures_util::StreamExt;
use std::collections::HashMap;
let string_stream = sse_events(response);
let response_stream = string_stream.scan(
HashMap::<usize, ToolCall>::new(),
|accumulated_tool_calls, result| {
let processed = result.and_then(|json_str| {
let mut streaming_response = serde_json::from_str::<StreamingResponse>(&json_str)
.map_err(|e| {
crate::error::LlmConnectorError::ParseError(format!(
"Failed to parse streaming response: {}",
e
))
})?;
if streaming_response.content.is_empty()
&& let Some(choice) = streaming_response.choices.first()
{
let content_to_use = choice
.delta
.content
.as_ref()
.filter(|s| !s.is_empty())
.or(choice.delta.reasoning_content.as_ref())
.or(choice.delta.reasoning.as_ref())
.or(choice.delta.thought.as_ref())
.or(choice.delta.thinking.as_ref());
if let Some(content) = content_to_use {
streaming_response.content = content.clone();
}
}
if let Some(choice) = streaming_response.choices.first_mut()
&& let Some(delta_tool_calls) = &choice.delta.tool_calls
{
for delta_call in delta_tool_calls {
let index = delta_call.index.unwrap_or(0);
accumulated_tool_calls
.entry(index)
.and_modify(|existing| existing.merge_delta(delta_call))
.or_insert_with(|| delta_call.clone());
}
let complete_calls: Vec<ToolCall> = accumulated_tool_calls
.values()
.filter(|call| call.is_complete())
.cloned()
.collect();
if !complete_calls.is_empty() {
choice.delta.tool_calls = Some(complete_calls);
} else {
choice.delta.tool_calls = None;
}
}
Ok(streaming_response)
});
std::future::ready(Some(processed))
},
);
Box::pin(response_stream)
}
#[cfg(test)]
mod tests {
#[test]
#[cfg(feature = "streaming")]
fn test_streaming_response_content_population() {
use crate::types::StreamingResponse;
let json_standard = r#"{
"id": "test-1",
"object": "chat.completion.chunk",
"created": 1234567890,
"model": "gpt-4",
"choices": [{
"index": 0,
"delta": {
"role": "assistant",
"content": "Hello world"
},
"finish_reason": null
}]
}"#;
let mut response: StreamingResponse = serde_json::from_str(json_standard).unwrap();
if response.content.is_empty()
&& let Some(choice) = response.choices.first()
{
let content_to_use = choice
.delta
.content
.as_ref()
.filter(|s| !s.is_empty())
.or(choice.delta.reasoning_content.as_ref())
.or(choice.delta.reasoning.as_ref())
.or(choice.delta.thought.as_ref())
.or(choice.delta.thinking.as_ref());
if let Some(content) = content_to_use {
response.content = content.clone();
}
}
assert_eq!(response.content, "Hello world");
let json_volcengine = r#"{
"id": "test-2",
"object": "chat.completion.chunk",
"created": 1234567890,
"model": "doubao-seed-code",
"choices": [{
"index": 0,
"delta": {
"role": "assistant",
"content": "",
"reasoning_content": "I am Doubao"
},
"finish_reason": null
}]
}"#;
let mut response: StreamingResponse = serde_json::from_str(json_volcengine).unwrap();
if response.content.is_empty()
&& let Some(choice) = response.choices.first()
{
let content_to_use = choice
.delta
.content
.as_ref()
.filter(|s| !s.is_empty())
.or(choice.delta.reasoning_content.as_ref())
.or(choice.delta.reasoning.as_ref())
.or(choice.delta.thought.as_ref())
.or(choice.delta.thinking.as_ref());
if let Some(content) = content_to_use {
response.content = content.clone();
}
}
assert_eq!(response.content, "I am Doubao");
let json_deepseek = r#"{
"id": "test-3",
"object": "chat.completion.chunk",
"created": 1234567890,
"model": "deepseek-r1",
"choices": [{
"index": 0,
"delta": {
"role": "assistant",
"content": "",
"reasoning": "Let me think..."
},
"finish_reason": null
}]
}"#;
let mut response: StreamingResponse = serde_json::from_str(json_deepseek).unwrap();
if response.content.is_empty()
&& let Some(choice) = response.choices.first()
{
let content_to_use = choice
.delta
.content
.as_ref()
.filter(|s| !s.is_empty())
.or(choice.delta.reasoning_content.as_ref())
.or(choice.delta.reasoning.as_ref())
.or(choice.delta.thought.as_ref())
.or(choice.delta.thinking.as_ref());
if let Some(content) = content_to_use {
response.content = content.clone();
}
}
assert_eq!(response.content, "Let me think...");
}
#[test]
fn test_sse_event_parsing() {
let sse_data = "data: {\"test\": \"value\"}\n\n";
assert!(sse_data.starts_with("data: "));
let sse_done = "data: [DONE]\n\n";
assert!(sse_done.contains("[DONE]"));
}
}