use eventsource_stream::Eventsource;
use futures::StreamExt;
use llmkit_core::{ChatStream, LlmError, StreamDelta, TokenUsage};
use crate::types::{StreamContentBlock, StreamDeltaBlock, StreamEvent};
pub(crate) fn parse(resp: reqwest::Response) -> ChatStream {
let stream = async_stream::stream! {
let mut events = resp.bytes_stream().eventsource();
let mut usage = TokenUsage::default();
while let Some(event) = events.next().await {
let event = match event {
Ok(e) => e,
Err(e) => {
yield Err(LlmError::Stream(e.to_string()));
return;
}
};
let parsed: StreamEvent = match serde_json::from_str(&event.data) {
Ok(p) => p,
Err(e) => {
yield Err(LlmError::Stream(format!("malformed event: {e}")));
continue;
}
};
match parsed {
StreamEvent::MessageStart { message } => {
if let Some(u) = message.usage {
usage.prompt = u.input_tokens;
}
}
StreamEvent::ContentBlockStart { content_block, .. } => {
if let StreamContentBlock::ToolUse { id, name } = content_block {
yield Ok(StreamDelta::ToolCall {
id: Some(id),
name: Some(name),
input_delta: String::new(),
});
}
}
StreamEvent::ContentBlockDelta { delta, .. } => match delta {
StreamDeltaBlock::TextDelta { text } => {
yield Ok(StreamDelta::Text(text));
}
StreamDeltaBlock::InputJsonDelta { partial_json } => {
yield Ok(StreamDelta::ToolCall {
id: None,
name: None,
input_delta: partial_json,
});
}
StreamDeltaBlock::Unknown => {}
},
StreamEvent::MessageDelta { usage: u } => {
if let Some(u) = u {
usage.completion = u.output_tokens;
if u.input_tokens > 0 {
usage.prompt = u.input_tokens;
}
}
}
StreamEvent::Error { error } => {
yield Err(LlmError::Stream(error.message));
return;
}
StreamEvent::MessageStop => break,
StreamEvent::ContentBlockStop { .. }
| StreamEvent::Ping
| StreamEvent::Unknown => {}
}
}
yield Ok(StreamDelta::Done { usage });
};
Box::pin(stream)
}