use futures::stream::{BoxStream, TryStreamExt};
use reqwest::Response;
use tracing::{debug, warn};
use super::error::{Result, VsCodeError};
use super::types::ChatCompletionChunk;
pub(super) fn parse_sse_stream(response: Response) -> BoxStream<'static, Result<String>> {
let mut buffer = String::new();
let stream = response
.bytes_stream()
.map_err(|e| VsCodeError::Stream(e.to_string()))
.try_filter_map(move |chunk| {
buffer.push_str(&String::from_utf8_lossy(&chunk));
let mut lines = Vec::new();
while let Some(idx) = buffer.find('\n') {
let line = buffer[..idx].trim().to_string();
buffer.drain(..=idx);
if !line.is_empty() {
lines.push(line);
}
}
futures::future::ready(Ok(if lines.is_empty() {
None
} else {
Some(futures::stream::iter(lines.into_iter().map(Ok)))
}))
})
.try_flatten()
.try_filter_map(|line| async move {
if let Some(data) = line.strip_prefix("data: ") {
if data.trim() == "[DONE]" {
debug!("Received [DONE] signal, ending stream");
return Ok(None);
}
match serde_json::from_str::<ChatCompletionChunk>(data) {
Ok(chunk) => {
if let Some(choice) = chunk.choices.first() {
if let Some(content) = &choice.delta.content {
if !content.is_empty() {
debug!(content_len = content.len(), "Received content delta");
return Ok(Some(content.clone()));
}
}
}
Ok(None)
}
Err(e) => {
warn!(error = %e, data = %data, "Failed to parse SSE chunk");
Err(VsCodeError::Stream(format!("Failed to parse chunk: {}", e)))
}
}
} else if line.starts_with("event: ") || line.starts_with("id: ") {
Ok(None)
} else if line.starts_with(':') {
Ok(None)
} else {
warn!(line = %line, "Unexpected SSE line format");
Ok(None)
}
});
Box::pin(stream)
}
pub(super) fn parse_sse_stream_with_tools(
response: Response,
) -> BoxStream<'static, Result<crate::traits::StreamChunk>> {
use crate::traits::StreamChunk;
let mut buffer = String::new();
let stream = response
.bytes_stream()
.map_err(|e| VsCodeError::Stream(e.to_string()))
.try_filter_map(move |chunk| {
buffer.push_str(&String::from_utf8_lossy(&chunk));
let mut lines = Vec::new();
while let Some(idx) = buffer.find('\n') {
let line = buffer[..idx].trim().to_string();
buffer.drain(..=idx);
if !line.is_empty() {
lines.push(line);
}
}
futures::future::ready(Ok(if lines.is_empty() {
None
} else {
Some(futures::stream::iter(lines.into_iter().map(Ok)))
}))
})
.try_flatten()
.try_filter_map(|line| async move {
if let Some(data) = line.strip_prefix("data: ") {
if data.trim() == "[DONE]" {
debug!("Received [DONE] signal, ending stream");
return Ok(Some(StreamChunk::Finished {
reason: "stop".to_string(),
ttft_ms: None,
}));
}
match serde_json::from_str::<ChatCompletionChunk>(data) {
Ok(chunk) => {
if let Some(choice) = chunk.choices.first() {
if let Some(ref finish_reason) = choice.finish_reason {
debug!(reason = %finish_reason, "Stream finished");
return Ok(Some(StreamChunk::Finished {
reason: finish_reason.clone(),
ttft_ms: None,
}));
}
if let Some(ref tool_calls) = choice.delta.tool_calls {
if let Some(tc) = tool_calls.first() {
let function_name =
tc.function.as_ref().and_then(|f| f.name.clone());
let function_arguments =
tc.function.as_ref().and_then(|f| f.arguments.clone());
debug!(
index = tc.index,
id = ?tc.id,
name = ?function_name,
"Received tool call delta"
);
return Ok(Some(StreamChunk::ToolCallDelta {
index: tc.index,
id: tc.id.clone(),
function_name,
function_arguments,
thought_signature: None,
}));
}
}
if let Some(ref content) = choice.delta.content {
if !content.is_empty() {
debug!(content_len = content.len(), "Received content delta");
return Ok(Some(StreamChunk::Content(content.clone())));
}
}
}
Ok(None)
}
Err(e) => {
warn!(error = %e, data = %data, "Failed to parse SSE chunk");
Err(VsCodeError::Stream(format!("Failed to parse chunk: {}", e)))
}
}
} else if line.starts_with("event: ") || line.starts_with("id: ") {
Ok(None)
} else if line.starts_with(':') {
Ok(None)
} else {
warn!(line = %line, "Unexpected SSE line format");
Ok(None)
}
});
Box::pin(stream)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_parse_done_signal() {
let data = "data: [DONE]";
assert!(data.starts_with("data: "));
let content = &data[6..];
assert_eq!(content.trim(), "[DONE]");
}
#[test]
fn test_done_signal_with_whitespace() {
let variations = [
"data: [DONE]",
"data: [DONE] ",
"data: [DONE]\r",
"data: [DONE]",
];
for data in variations {
assert!(data.starts_with("data:"), "Should start with data:");
let content = data.strip_prefix("data:").unwrap().trim();
assert_eq!(content, "[DONE]", "Failed for: {:?}", data);
}
}
#[test]
fn test_done_signal_is_case_sensitive() {
let invalid = ["data: [done]", "data: [Done]", "data: done"];
for data in invalid {
let content = data.strip_prefix("data: ").unwrap_or("").trim();
assert_ne!(content, "[DONE]", "[DONE] check should be case-sensitive");
}
}
#[test]
fn test_sse_event_line_prefix() {
let line = "event: message";
assert!(line.starts_with("event: "), "Should recognize event prefix");
}
#[test]
fn test_sse_id_line_prefix() {
let line = "id: 12345";
assert!(line.starts_with("id: "), "Should recognize id prefix");
}
#[test]
fn test_sse_comment_line_prefix() {
let comment = ": this is a comment";
assert!(comment.starts_with(':'), "Should recognize comment prefix");
}
#[test]
fn test_sse_data_line_prefix() {
let data_line = "data: {\"content\":\"hello\"}";
assert!(data_line.starts_with("data: "));
let json = data_line.strip_prefix("data: ").unwrap();
assert!(json.starts_with('{'));
}
#[test]
fn test_parse_chunk_format() {
let json = r#"{"id":"test","object":"chat.completion.chunk","created":123,"model":"gpt-4o","choices":[{"index":0,"delta":{"content":"Hello"},"finish_reason":null}]}"#;
let chunk: std::result::Result<ChatCompletionChunk, _> = serde_json::from_str(json);
assert!(chunk.is_ok());
let chunk = chunk.unwrap();
assert_eq!(chunk.id, "test");
assert_eq!(chunk.choices[0].delta.content, Some("Hello".to_string()));
}
#[test]
fn test_chunk_with_empty_content() {
let json = r#"{"id":"test","object":"chat.completion.chunk","created":123,"model":"gpt-4o","choices":[{"index":0,"delta":{"role":"assistant","content":""},"finish_reason":null}]}"#;
let chunk: ChatCompletionChunk = serde_json::from_str(json).unwrap();
assert_eq!(chunk.choices[0].delta.content, Some("".to_string()));
}
#[test]
fn test_chunk_with_no_content() {
let json = r#"{"id":"test","object":"chat.completion.chunk","created":123,"model":"gpt-4o","choices":[{"index":0,"delta":{},"finish_reason":"stop"}]}"#;
let chunk: ChatCompletionChunk = serde_json::from_str(json).unwrap();
assert!(chunk.choices[0].delta.content.is_none());
assert_eq!(chunk.choices[0].finish_reason, Some("stop".to_string()));
}
#[test]
fn test_chunk_with_role_only() {
let json = r#"{"id":"test","object":"chat.completion.chunk","created":123,"model":"gpt-4o","choices":[{"index":0,"delta":{"role":"assistant"},"finish_reason":null}]}"#;
let chunk: ChatCompletionChunk = serde_json::from_str(json).unwrap();
assert_eq!(chunk.choices[0].delta.role, Some("assistant".to_string()));
assert!(chunk.choices[0].delta.content.is_none());
}
#[test]
fn test_chunk_malformed_json() {
let bad_json = r#"{"id":"test", broken json"#;
let result: std::result::Result<ChatCompletionChunk, _> = serde_json::from_str(bad_json);
assert!(result.is_err(), "Malformed JSON should fail to parse");
}
#[test]
fn test_chunk_multiple_choices() {
let json = r#"{
"id":"test",
"object":"chat.completion.chunk",
"created":123,
"model":"gpt-4o",
"choices":[
{"index":0,"delta":{"content":"First"},"finish_reason":null},
{"index":1,"delta":{"content":"Second"},"finish_reason":null}
]
}"#;
let chunk: ChatCompletionChunk = serde_json::from_str(json).unwrap();
assert_eq!(chunk.choices.len(), 2);
assert_eq!(chunk.choices[0].delta.content, Some("First".to_string()));
assert_eq!(chunk.choices[1].delta.content, Some("Second".to_string()));
}
#[test]
fn test_chunk_empty_choices() {
let json = r#"{"id":"test","object":"chat.completion.chunk","created":123,"model":"gpt-4o","choices":[]}"#;
let chunk: ChatCompletionChunk = serde_json::from_str(json).unwrap();
assert!(chunk.choices.is_empty());
}
#[test]
fn test_extract_content_from_choice() {
let json = r#"{"id":"abc","object":"chat.completion.chunk","created":123,"model":"gpt-4o","choices":[{"index":0,"delta":{"content":"World"},"finish_reason":null}]}"#;
let chunk: ChatCompletionChunk = serde_json::from_str(json).unwrap();
let content = chunk
.choices
.first()
.and_then(|c| c.delta.content.as_ref())
.filter(|s| !s.is_empty());
assert_eq!(content, Some(&"World".to_string()));
}
#[test]
fn test_extract_content_filters_empty() {
let json = r#"{"id":"abc","object":"chat.completion.chunk","created":123,"model":"gpt-4o","choices":[{"index":0,"delta":{"content":""},"finish_reason":null}]}"#;
let chunk: ChatCompletionChunk = serde_json::from_str(json).unwrap();
let content = chunk
.choices
.first()
.and_then(|c| c.delta.content.as_ref())
.filter(|s| !s.is_empty());
assert!(content.is_none(), "Empty content should be filtered out");
}
#[test]
fn test_extract_content_with_unicode() {
let json = r#"{"id":"abc","object":"chat.completion.chunk","created":123,"model":"gpt-4o","choices":[{"index":0,"delta":{"content":"Hello 世界 🌍"},"finish_reason":null}]}"#;
let chunk: ChatCompletionChunk = serde_json::from_str(json).unwrap();
assert_eq!(
chunk.choices[0].delta.content,
Some("Hello 世界 🌍".to_string())
);
}
#[test]
fn test_extract_content_with_newlines() {
let json = r#"{"id":"abc","object":"chat.completion.chunk","created":123,"model":"gpt-4o","choices":[{"index":0,"delta":{"content":"line1\nline2\nline3"},"finish_reason":null}]}"#;
let chunk: ChatCompletionChunk = serde_json::from_str(json).unwrap();
let content = chunk.choices[0].delta.content.as_ref().unwrap();
assert!(content.contains('\n'));
assert_eq!(content.lines().count(), 3);
}
#[test]
fn test_finish_reason_values() {
let reasons = ["stop", "length", "content_filter", "tool_calls"];
for reason in reasons {
let json = format!(
r#"{{"id":"test","object":"chat.completion.chunk","created":123,"model":"gpt-4o","choices":[{{"index":0,"delta":{{}},"finish_reason":"{}"}}]}}"#,
reason
);
let chunk: ChatCompletionChunk = serde_json::from_str(&json).unwrap();
assert_eq!(chunk.choices[0].finish_reason, Some(reason.to_string()));
}
}
#[test]
fn test_finish_reason_null_during_streaming() {
let json = r#"{"id":"test","object":"chat.completion.chunk","created":123,"model":"gpt-4o","choices":[{"index":0,"delta":{"content":"streaming..."},"finish_reason":null}]}"#;
let chunk: ChatCompletionChunk = serde_json::from_str(json).unwrap();
assert!(chunk.choices[0].finish_reason.is_none());
assert!(chunk.choices[0].delta.content.is_some());
}
}