use futures::StreamExt;
use llmkit_core::{ChatStream, LlmError, StreamDelta, TokenUsage};
use crate::types::ChatResponseBody;
pub(crate) fn parse(resp: reqwest::Response) -> ChatStream {
let stream = async_stream::stream! {
let mut bytes = resp.bytes_stream();
let mut buf: Vec<u8> = Vec::new();
let mut usage = TokenUsage::default();
'outer: while let Some(chunk) = bytes.next().await {
let chunk = match chunk {
Ok(c) => c,
Err(e) => {
yield Err(LlmError::Stream(e.to_string()));
return;
}
};
buf.extend_from_slice(&chunk);
while let Some(pos) = buf.iter().position(|&b| b == b'\n') {
let line: Vec<u8> = buf.drain(..=pos).collect();
let line = &line[..line.len() - 1]; if line.is_empty() {
continue;
}
let obj: ChatResponseBody = match serde_json::from_slice(line) {
Ok(o) => o,
Err(e) => {
yield Err(LlmError::Stream(format!("malformed ndjson: {e}")));
continue;
}
};
if let Some(msg) = &obj.message {
if !msg.content.is_empty() {
yield Ok(StreamDelta::Text(msg.content.clone()));
}
if let Some(calls) = &msg.tool_calls {
for c in calls {
yield Ok(StreamDelta::ToolCall {
id: None,
name: Some(c.function.name.clone()),
input_delta: c.function.arguments.to_string(),
});
}
}
}
if obj.done {
usage = TokenUsage::new(
obj.prompt_eval_count.unwrap_or(0),
obj.eval_count.unwrap_or(0),
);
break 'outer;
}
}
}
yield Ok(StreamDelta::Done { usage });
};
Box::pin(stream)
}