use stream_rs::accumulators::anthropic::{AnthropicAccumulator, BlockKind};
use stream_rs::sse::SseParser;
fn wire_chunks() -> Vec<&'static [u8]> {
vec![
b"event: message_start\ndata: {\"type\":\"message_start\"}\n\n",
b"event: content_block_start\ndata: {\"index\":0,\"content_block\":{\"type\":\"text\"}}\n\n",
b"event: content_block_delta\ndata: {\"index\":0,\"delta\":{\"type\":\"text_delta\",\"text\":\"Hello",
b"\"}}\n\n", b"event: content_block_delta\ndata: {\"index\":0,\"delta\":{\"type\":\"text_delta\",\"text\":\", world\"}}\n\n",
b"event: content_block_stop\ndata: {\"index\":0}\n\n",
b"event: content_block_start\ndata: {\"index\":1,\"content_block\":{\"type\":\"tool_use\"}}\n\n",
b"event: content_block_delta\ndata: {\"index\":1,\"delta\":{\"type\":\"input_json_delta\",\"partial_json\":\"{\\\"x\\\":\"}}\n\n",
b"event: content_block_delta\ndata: {\"index\":1,\"delta\":{\"type\":\"input_json_delta\",\"partial_json\":\"1}\"}}\n\n",
b"event: content_block_stop\ndata: {\"index\":1}\n\n",
b"event: message_delta\ndata: {\"delta\":{\"stop_reason\":\"end_turn\"}}\n\n",
b"event: message_stop\ndata: {\"type\":\"message_stop\"}\n\n",
]
}
fn main() {
let mut parser = SseParser::new();
let mut acc = AnthropicAccumulator::new();
let mut events = Vec::new();
for chunk in wire_chunks() {
events.clear();
parser.feed(chunk, &mut events);
for ev in &events {
dispatch(ev.event.as_deref().unwrap_or("message"), &ev.data, &mut acc);
}
}
parser.finish(&mut events);
for (i, block) in acc.blocks() {
println!(
"block[{i}] kind={:?} stopped={} text={:?}",
block.kind, block.stopped, block.text
);
}
println!("stop_reason : {:?}", acc.stop_reason());
assert_eq!(acc.block(0).unwrap().text, "Hello, world");
assert_eq!(acc.block(1).unwrap().text, r#"{"x":1}"#);
assert_eq!(acc.stop_reason(), Some("end_turn"));
println!("\nOK: message reassembled and block ordering validated.");
}
fn dispatch(event: &str, data: &str, acc: &mut AnthropicAccumulator) {
match event {
"message_start" => acc.message_start(),
"content_block_start" => {
let index = usize_after(data, "\"index\":").unwrap_or(0);
let kind = if data.contains("\"tool_use\"") {
BlockKind::ToolUse
} else {
BlockKind::Text
};
acc.content_block_start(index, kind).unwrap();
}
"content_block_delta" => {
let index = usize_after(data, "\"index\":").unwrap_or(0);
if let Some(text) = string_after(data, "\"text\":\"") {
acc.text_delta(index, &unescape(&text)).unwrap();
} else if let Some(json) = string_after(data, "\"partial_json\":\"") {
acc.input_json_delta(index, &unescape(&json)).unwrap();
}
}
"content_block_stop" => {
let index = usize_after(data, "\"index\":").unwrap_or(0);
acc.content_block_stop(index).unwrap();
}
"message_delta" => {
acc.message_delta(string_after(data, "\"stop_reason\":\"").as_deref());
}
"message_stop" => acc.message_stop(),
_ => {}
}
}
fn usize_after(haystack: &str, marker: &str) -> Option<usize> {
let start = haystack.find(marker)? + marker.len();
let rest = &haystack[start..];
let end = rest
.find(|c: char| !c.is_ascii_digit())
.unwrap_or(rest.len());
rest[..end].parse().ok()
}
fn string_after(haystack: &str, marker: &str) -> Option<String> {
let start = haystack.find(marker)? + marker.len();
let bytes = haystack.as_bytes();
let mut i = start;
let mut out = String::new();
while i < bytes.len() {
match bytes[i] {
b'\\' if i + 1 < bytes.len() => {
out.push('\\');
out.push(bytes[i + 1] as char);
i += 2;
}
b'"' => return Some(out),
b => {
out.push(b as char);
i += 1;
}
}
}
Some(out)
}
fn unescape(s: &str) -> String {
s.replace("\\\"", "\"").replace("\\\\", "\\")
}