use eventsource_stream::Eventsource;
use futures::StreamExt;
use llmkit_core::{ChatStream, LlmError, StreamDelta, TokenUsage};
use crate::types::ChatCompletionChunk;
pub(crate) fn parse(resp: reqwest::Response) -> ChatStream {
let stream = async_stream::stream! {
let mut events = resp.bytes_stream().eventsource();
let mut usage = TokenUsage::default();
while let Some(event) = events.next().await {
let event = match event {
Ok(e) => e,
Err(e) => {
yield Err(LlmError::Stream(e.to_string()));
return;
}
};
if event.data == "[DONE]" {
break;
}
let chunk: ChatCompletionChunk = match serde_json::from_str(&event.data) {
Ok(c) => c,
Err(e) => {
yield Err(LlmError::Stream(format!("malformed chunk: {e}")));
continue;
}
};
if let Some(u) = chunk.usage {
usage = TokenUsage::new(u.prompt_tokens, u.completion_tokens);
}
for choice in chunk.choices {
if let Some(text) = choice.delta.content {
if !text.is_empty() {
yield Ok(StreamDelta::Text(text));
}
}
if let Some(calls) = choice.delta.tool_calls {
for c in calls {
let (name, input_delta) = c
.function
.map(|f| (f.name, f.arguments.unwrap_or_default()))
.unwrap_or((None, String::new()));
yield Ok(StreamDelta::ToolCall {
id: c.id,
name,
input_delta,
});
}
}
}
}
yield Ok(StreamDelta::Done { usage });
};
Box::pin(stream)
}