use serde::Deserialize;
use crate::error::Result;
use crate::stream::{StopReason, StreamChunk};
use crate::usage::Usage;
#[derive(Debug, Clone, Deserialize)]
struct OpenAIStreamChunk {
pub choices: Vec<OpenAIStreamChoice>,
#[serde(default)]
pub usage: Option<Usage>,
}
#[derive(Debug, Clone, Deserialize)]
struct OpenAIStreamChoice {
pub delta: OpenAIStreamDelta,
pub finish_reason: Option<String>,
}
#[derive(Debug, Clone, Default, Deserialize)]
struct OpenAIStreamDelta {
pub content: Option<String>,
pub tool_calls: Option<Vec<OpenAIStreamToolCall>>,
}
#[derive(Debug, Clone, Deserialize)]
struct OpenAIStreamToolCall {
pub index: usize,
#[serde(default)]
pub id: Option<String>,
pub function: Option<OpenAIStreamFunctionCall>,
}
#[derive(Debug, Clone, Deserialize)]
struct OpenAIStreamFunctionCall {
pub name: Option<String>,
pub arguments: Option<String>,
}
pub fn parse_sse_events(text: &str) -> Vec<Result<StreamChunk>> {
let mut results = Vec::new();
for line in text.lines() {
let line = line.trim();
if line.is_empty() || line.starts_with(':') {
continue;
}
if let Some(data) = line.strip_prefix("data: ") {
let data = data.trim();
if data == "[DONE]" {
results.push(Ok(StreamChunk::done(Some(StopReason::Stop))));
continue;
}
match serde_json::from_str::<OpenAIStreamChunk>(data) {
Ok(chunk) => {
results.extend(convert_chunk(&chunk));
}
Err(e) => {
tracing::warn!("Failed to parse SSE chunk: {e}, data: {data}");
}
}
}
}
results
}
fn convert_chunk(chunk: &OpenAIStreamChunk) -> Vec<Result<StreamChunk>> {
let mut results = Vec::new();
for choice in &chunk.choices {
if let Some(content) = &choice.delta.content
&& !content.is_empty()
{
results.push(Ok(StreamChunk::text(content)));
}
if let Some(tool_calls) = &choice.delta.tool_calls {
for tc in tool_calls {
if let (Some(id), Some(func)) = (&tc.id, &tc.function)
&& let Some(name) = &func.name
{
results.push(Ok(StreamChunk::tool_use_start(tc.index, id, name)));
}
if let Some(func) = &tc.function
&& let Some(args) = &func.arguments
&& !args.is_empty()
{
results.push(Ok(StreamChunk::tool_use_delta(tc.index, args)));
}
}
}
if let Some(reason) = &choice.finish_reason {
let stop_reason = StopReason::parse(reason);
if matches!(stop_reason, StopReason::ToolCalls) {
if let Some(tool_calls) = &choice.delta.tool_calls {
for tc in tool_calls {
results.push(Ok(StreamChunk::ToolUseComplete { index: tc.index }));
}
}
}
}
}
if let Some(usage) = chunk.usage {
results.push(Ok(StreamChunk::Usage(usage)));
}
results
}