use crate::chat::{ChatEvent, ToolCall};
use anyhow::{Context, Result};
use tokio::sync::mpsc::Sender;
#[derive(Debug, Default)]
pub(super) struct ToolCallAccumulator {
slots: Vec<Option<(String, String, String)>>,
}
impl ToolCallAccumulator {
pub(super) fn apply_delta(&mut self, tool_calls: &serde_json::Value) {
let Some(arr) = tool_calls.as_array() else {
return;
};
for tc in arr {
let idx = tc.get("index").and_then(|i| i.as_u64()).unwrap_or(0) as usize;
while self.slots.len() <= idx {
self.slots.push(None);
}
let slot = self.slots[idx]
.get_or_insert_with(|| (String::new(), String::new(), String::new()));
if let Some(id) = tc
.get("id")
.and_then(|v| v.as_str())
.filter(|s| !s.is_empty())
{
slot.0 = id.to_string();
}
if let Some(func) = tc.get("function") {
if let Some(name) = func
.get("name")
.and_then(|v| v.as_str())
.filter(|s| !s.is_empty())
{
slot.1 = name.to_string();
}
if let Some(args) = func.get("arguments").and_then(|v| v.as_str()) {
slot.2.push_str(args);
}
}
}
}
pub(super) fn finalize(self) -> Vec<ToolCall> {
self.slots
.into_iter()
.filter_map(|opt| {
opt.and_then(|(id, name, arguments)| {
if name.is_empty() {
None
} else {
Some(ToolCall {
id,
name,
arguments,
})
}
})
})
.collect()
}
}
pub(super) async fn pump_openai_sse(resp: reqwest::Response, tx: Sender<ChatEvent>) -> Result<()> {
use futures_util::StreamExt;
let mut acc = ToolCallAccumulator::default();
let mut buf = String::new();
let mut stream = resp.bytes_stream();
while let Some(chunk) = stream.next().await {
let bytes = chunk.context("read chat stream chunk")?;
let text = match std::str::from_utf8(&bytes) {
Ok(s) => s,
Err(_) => continue,
};
buf.push_str(text);
while let Some(idx) = buf.find('\n') {
let line: String = buf.drain(..=idx).collect();
let line = line.trim();
let Some(payload) = line.strip_prefix("data:").map(str::trim) else {
continue;
};
if payload.is_empty() {
continue;
}
if payload == "[DONE]" {
for call in std::mem::take(&mut acc).finalize() {
if tx.send(ChatEvent::ToolCall(call)).await.is_err() {
return Ok(());
}
}
let _ = tx.send(ChatEvent::Done).await;
return Ok(());
}
let v: serde_json::Value = match serde_json::from_str(payload) {
Ok(v) => v,
Err(_) => continue,
};
let delta = v
.get("choices")
.and_then(|c| c.get(0))
.and_then(|c| c.get("delta"));
if let Some(delta) = delta {
let content_opt = delta
.get("content")
.and_then(|c| c.as_str())
.filter(|s| !s.is_empty())
.map(|s| s.to_string());
let send_err = match content_opt {
Some(content) => tx.send(ChatEvent::Delta(content)).await.is_err(),
None => false,
};
if send_err {
return Ok(());
}
if let Some(tc) = delta.get("tool_calls") {
acc.apply_delta(tc);
}
}
}
}
for call in acc.finalize() {
if tx.send(ChatEvent::ToolCall(call)).await.is_err() {
return Ok(());
}
}
let _ = tx.send(ChatEvent::Done).await;
Ok(())
}