use stream_rs::accumulators::openai::OpenAiAccumulator;
use stream_rs::sse::SseParser;
fn wire_chunks() -> Vec<&'static [u8]> {
vec![
b"data: {\"choices\":[{\"index\":0,\"delta\":{\"role\":\"assistant\"}}]}\r",
b"\n\r\n",
b"data: {\"choices\":[{\"index\":0,\"delta\":{\"content\":\"The weather\"}}]}\n\n",
b"data: {\"choices\":[{\"index\":0,\"delta\":{\"content\":\" in \"}}]}\n\n",
b"data: {\"choices\":[{\"index\":0,\"delta\":{\"content\":\"Par",
b"is\"}}]}\n\n",
b"data: {\"choices\":[{\"index\":0,\"delta\":{\"tool_calls\":[{\"index\":0,\"id\":\"call_1\",\"function\":{\"name\":\"get_weather\",\"arguments\":\"{\\\"ci\"}}]}}]}\n\n",
b"data: {\"choices\":[{\"index\":0,\"delta\":{\"tool_calls\":[{\"index\":0,\"function\":{\"arguments\":\"ty\\\":\\\"Paris\\\"}\"}}]}}]}\n\n",
b"data: {\"choices\":[{\"index\":0,\"delta\":{},\"finish_reason\":\"tool_calls\"}]}\n\n",
b"data: [DONE]\n\n",
]
}
fn main() {
let mut parser = SseParser::new();
let mut acc = OpenAiAccumulator::new();
let mut events = Vec::new();
for chunk in wire_chunks() {
events.clear();
parser.feed(chunk, &mut events);
for ev in &events {
if ev.data == "[DONE]" {
continue;
}
apply_chunk(&ev.data, &mut acc);
}
}
parser.finish(&mut events);
let choice = acc.choice(0).expect("a choice was streamed");
println!("role : {:?}", choice.role.as_deref());
println!("content : {:?}", choice.content);
println!("finish_reason : {:?}", choice.finish_reason.as_deref());
for (i, tc) in &choice.tool_calls {
println!(
"tool_call[{i}] : id={:?} name={:?} args={}",
tc.id.as_deref(),
tc.name.as_deref(),
tc.arguments
);
}
assert_eq!(choice.content, "The weather in Paris");
assert_eq!(choice.finish_reason.as_deref(), Some("tool_calls"));
assert_eq!(choice.tool_calls[&0].arguments, r#"{"city":"Paris"}"#);
println!("\nOK: stream reassembled correctly across ragged chunk boundaries.");
}
fn apply_chunk(json: &str, acc: &mut OpenAiAccumulator) {
let index = 0;
if let Some(role) = scalar_after(json, "\"role\":\"") {
acc.push_role(index, &role);
}
if let Some(content) = scalar_after(json, "\"content\":\"") {
acc.push_content(index, &unescape(&content));
}
if let Some(reason) = scalar_after(json, "\"finish_reason\":\"") {
acc.set_finish_reason(index, &reason);
}
if json.contains("\"tool_calls\"") {
let id = scalar_after(json, "\"id\":\"");
let name = scalar_after(json, "\"name\":\"");
let args = scalar_after(json, "\"arguments\":\"").map(|s| unescape(&s));
acc.push_tool_call(index, 0, id.as_deref(), name.as_deref(), args.as_deref());
}
}
fn scalar_after(haystack: &str, marker: &str) -> Option<String> {
let start = haystack.find(marker)? + marker.len();
let bytes = haystack.as_bytes();
let mut i = start;
let mut out = String::new();
while i < bytes.len() {
match bytes[i] {
b'\\' if i + 1 < bytes.len() => {
out.push('\\');
out.push(bytes[i + 1] as char);
i += 2;
}
b'"' => return Some(out),
b => {
out.push(b as char);
i += 1;
}
}
}
Some(out)
}
fn unescape(s: &str) -> String {
s.replace("\\\"", "\"").replace("\\\\", "\\")
}