use serde_json::{Map, Value};
pub fn reassemble(events: &[eventsource_stream::Event]) -> anyhow::Result<String> {
let is_responses_api = events.iter().any(|e| e.event.starts_with("response."));
if is_responses_api {
return reassemble_responses(events);
}
let mut base: Option<Value> = None;
let mut choices: std::collections::BTreeMap<u64, Map<String, Value>> = Default::default();
let mut usage = Value::Null;
for event in events {
if event.data.is_empty() || event.data == "[DONE]" {
continue;
}
let chunk: Value = serde_json::from_str(&event.data)
.map_err(|e| anyhow::anyhow!("Invalid chunk JSON: {}", e))?;
if base.is_none() {
let mut b = chunk.clone();
if let Some(obj) = b["object"].as_str() {
b["object"] = Value::String(obj.replace(".chunk", ""));
}
if let Some(m) = b.as_object_mut() {
m.remove("choices");
m.remove("usage");
}
base = Some(b);
}
if !chunk["usage"].is_null() {
usage = chunk["usage"].clone();
}
if let Some(chunk_choices) = chunk["choices"].as_array() {
for choice in chunk_choices {
let index = choice["index"].as_u64().unwrap_or(0);
let merged = choices.entry(index).or_default();
if !choice["finish_reason"].is_null() {
merged.insert("finish_reason".to_string(), choice["finish_reason"].clone());
}
if let Some(text) = choice["text"].as_str() {
let existing = merged
.entry("text".to_string())
.or_insert(Value::String(String::new()));
if let Value::String(s) = existing {
s.push_str(text);
}
}
if let Some(delta) = choice["delta"].as_object() {
let message = merged
.entry("message".to_string())
.or_insert(Value::Object(Map::new()));
if let Value::Object(msg) = message {
for (key, value) in delta {
if value.is_null() {
continue;
}
match key.as_str() {
"tool_calls" => merge_tool_calls(msg, value),
_ => merge_delta_field(msg, key, value),
}
}
}
}
}
}
}
let mut response = base.unwrap_or(Value::Object(Map::new()));
let assembled_choices: Vec<Value> = choices
.into_iter()
.map(|(index, mut fields)| {
fields.insert("index".to_string(), Value::Number(index.into()));
if !fields.contains_key("finish_reason") {
fields.insert("finish_reason".to_string(), Value::Null);
}
Value::Object(fields)
})
.collect();
response["choices"] = Value::Array(assembled_choices);
response["usage"] = usage;
Ok(response.to_string())
}
fn reassemble_responses(events: &[eventsource_stream::Event]) -> anyhow::Result<String> {
for event in events.iter().rev() {
if event.event == "response.completed" {
let parsed: Value = serde_json::from_str(&event.data)
.map_err(|e| anyhow::anyhow!("Invalid response.completed JSON: {}", e))?;
if let Some(response) = parsed.get("response") {
return serde_json::to_string(response).map_err(Into::into);
}
anyhow::bail!(
"response.completed event JSON does not contain top-level \"response\" field"
);
}
}
anyhow::bail!("No response.completed event found in Responses API SSE stream")
}
fn merge_tool_calls(msg: &mut Map<String, Value>, value: &Value) {
let Some(arr) = value.as_array() else { return };
let tc_list = msg
.entry("tool_calls".to_string())
.or_insert(Value::Array(vec![]));
let Value::Array(existing) = tc_list else {
return;
};
for tc_delta in arr {
let idx = tc_delta["index"].as_u64().unwrap_or(0) as usize;
while existing.len() <= idx {
existing.push(Value::Object(Map::new()));
}
let slot = existing[idx].as_object_mut().unwrap();
for field in ["id", "type"] {
if let Some(v) = tc_delta.get(field) {
if !v.is_null() {
slot.insert(field.to_string(), v.clone());
}
}
}
if let Some(func) = tc_delta["function"].as_object() {
let f = slot
.entry("function".to_string())
.or_insert(Value::Object(Map::new()))
.as_object_mut()
.unwrap();
for field in ["name", "arguments"] {
if let Some(s) = func.get(field).and_then(|v| v.as_str()) {
let existing = f
.entry(field.to_string())
.or_insert(Value::String(String::new()));
if let Value::String(es) = existing {
es.push_str(s);
}
}
}
}
}
}
fn merge_delta_field(msg: &mut Map<String, Value>, key: &str, value: &Value) {
if key == "role" {
msg.insert(key.to_string(), value.clone());
} else if let Some(s) = value.as_str() {
let existing = msg
.entry(key.to_string())
.or_insert(Value::String(String::new()));
if let Value::String(existing_str) = existing {
existing_str.push_str(s);
}
} else {
msg.insert(key.to_string(), value.clone());
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::path::PathBuf;
use std::sync::Once;
static GENERATE: Once = Once::new();
fn ensure_fixtures() {
GENERATE.call_once(|| {
let (Ok(base_url), Ok(model), Ok(fixture_name)) = (
std::env::var("BASE_URL"),
std::env::var("MODEL"),
std::env::var("FIXTURE_NAME"),
) else {
return;
};
let api_key = std::env::var("API_KEY").unwrap_or_else(|_| "none".to_string());
let root = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
let fixtures_dir = root.join("fixtures").join(&fixture_name);
std::fs::create_dir_all(&fixtures_dir).unwrap();
let cases: Value = serde_json::from_str(
&std::fs::read_to_string(root.join("test_cases.json")).unwrap(),
)
.unwrap();
let rt = tokio::runtime::Runtime::new().unwrap();
let client = reqwest::Client::new();
for (name, case) in cases.as_object().unwrap() {
let endpoint = case["endpoint"].as_str().unwrap();
if endpoint.ends_with("/responses") {
rt.block_on(record_responses_fixture(
&client,
&base_url,
&api_key,
&model,
name,
case,
&fixtures_dir,
));
} else {
rt.block_on(record_fixture(
&client,
&base_url,
&api_key,
&model,
name,
case,
&fixtures_dir,
));
}
}
});
}
fn fixture_providers() -> Vec<String> {
let root = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
let fixtures_dir = root.join("fixtures");
let mut providers: Vec<String> = std::fs::read_dir(&fixtures_dir)
.unwrap()
.filter_map(|entry| {
let entry = entry.ok()?;
if entry.file_type().ok()?.is_dir() {
Some(entry.file_name().to_string_lossy().to_string())
} else {
None
}
})
.collect();
providers.sort();
providers
}
async fn record_fixture(
client: &reqwest::Client,
base_url: &str,
api_key: &str,
model: &str,
name: &str,
case: &Value,
fixtures_dir: &PathBuf,
) {
let endpoint = case["endpoint"].as_str().unwrap();
let url = format!("{base_url}{endpoint}");
let mut body = case["body"].as_object().unwrap().clone();
body.insert("model".to_string(), Value::String(model.to_string()));
body.insert("temperature".to_string(), Value::Number(0.into()));
body.insert("seed".to_string(), Value::Number(42.into()));
let mut non_stream_body = body.clone();
non_stream_body.insert("stream".to_string(), Value::Bool(false));
eprintln!("[{name}] POST {url} (non-streaming)");
let expected: Value = client
.post(&url)
.bearer_auth(api_key)
.json(&non_stream_body)
.send()
.await
.unwrap_or_else(|e| panic!("{name}: non-streaming request failed: {e}"))
.json()
.await
.unwrap_or_else(|e| panic!("{name}: non-streaming parse failed: {e}"));
eprintln!("[{name}] non-streaming response received");
let mut stream_body = body.clone();
stream_body.insert("stream".to_string(), Value::Bool(true));
let mut stream_opts = serde_json::Map::new();
stream_opts.insert("include_usage".to_string(), Value::Bool(true));
stream_body.insert("stream_options".to_string(), Value::Object(stream_opts));
eprintln!("[{name}] POST {url} (streaming)");
let response_text = client
.post(&url)
.bearer_auth(api_key)
.json(&stream_body)
.send()
.await
.unwrap_or_else(|e| panic!("{name}: streaming request failed: {e}"))
.text()
.await
.unwrap_or_else(|e| panic!("{name}: streaming read failed: {e}"));
let mut chunks: Vec<Value> = vec![];
for line in response_text.lines() {
if let Some(data) = line.strip_prefix("data: ") {
if data == "[DONE]" {
chunks.push(Value::String("[DONE]".to_string()));
} else if let Ok(parsed) = serde_json::from_str::<Value>(data) {
chunks.push(parsed);
}
}
}
eprintln!("[{name}] streaming response: {} chunks", chunks.len());
let fixture = serde_json::json!({ "chunks": chunks, "expected": expected });
let path = fixtures_dir.join(format!("{name}.json"));
std::fs::write(
&path,
serde_json::to_string_pretty(&fixture).unwrap() + "\n",
)
.unwrap_or_else(|e| panic!("{name}: failed to write fixture: {e}"));
eprintln!("[{name}] fixture written to {}", path.display());
}
async fn record_responses_fixture(
client: &reqwest::Client,
base_url: &str,
api_key: &str,
model: &str,
name: &str,
case: &Value,
fixtures_dir: &PathBuf,
) {
let endpoint = case["endpoint"].as_str().unwrap();
let url = format!("{base_url}{endpoint}");
let mut body = case["body"].as_object().unwrap().clone();
body.insert("model".to_string(), Value::String(model.to_string()));
body.insert("temperature".to_string(), Value::Number(0.into()));
body.insert("seed".to_string(), Value::Number(42.into()));
eprintln!("[{name}] POST {url} (non-streaming)");
let expected: Value = client
.post(&url)
.bearer_auth(api_key)
.json(&body)
.send()
.await
.unwrap_or_else(|e| panic!("{name}: non-streaming request failed: {e}"))
.json()
.await
.unwrap_or_else(|e| panic!("{name}: non-streaming parse failed: {e}"));
eprintln!("[{name}] non-streaming response received");
body.insert("stream".to_string(), Value::Bool(true));
eprintln!("[{name}] POST {url} (streaming)");
let response_text = client
.post(&url)
.bearer_auth(api_key)
.json(&body)
.send()
.await
.unwrap_or_else(|e| panic!("{name}: streaming request failed: {e}"))
.text()
.await
.unwrap_or_else(|e| panic!("{name}: streaming read failed: {e}"));
let mut events: Vec<Value> = vec![];
let mut current_event_type: Option<String> = None;
let mut current_data_lines: Vec<String> = Vec::new();
for raw_line in response_text.lines() {
let line = raw_line.trim_end_matches('\r');
if line.is_empty() {
if !current_data_lines.is_empty() {
let data_str = current_data_lines.join("\n");
if data_str != "[DONE]" {
if let Ok(parsed) = serde_json::from_str::<Value>(&data_str) {
let event_type = current_event_type.clone().unwrap_or_default();
events.push(
serde_json::json!({ "event_type": event_type, "data": parsed }),
);
}
}
}
current_event_type = None;
current_data_lines.clear();
} else if let Some(event_type) = line
.strip_prefix("event: ")
.or_else(|| line.strip_prefix("event:"))
{
current_event_type = Some(event_type.to_string());
} else if let Some(data) = line
.strip_prefix("data: ")
.or_else(|| line.strip_prefix("data:"))
{
current_data_lines.push(data.to_string());
}
}
if !current_data_lines.is_empty() {
let data_str = current_data_lines.join("\n");
if data_str != "[DONE]" {
if let Ok(parsed) = serde_json::from_str::<Value>(&data_str) {
let event_type = current_event_type.clone().unwrap_or_default();
events.push(
serde_json::json!({ "event_type": event_type, "data": parsed }),
);
}
}
}
eprintln!("[{name}] streaming response: {} events", events.len());
let fixture = serde_json::json!({ "events": events, "expected": expected });
let path = fixtures_dir.join(format!("{name}.json"));
std::fs::write(
&path,
serde_json::to_string_pretty(&fixture).unwrap() + "\n",
)
.unwrap_or_else(|e| panic!("{name}: failed to write fixture: {e}"));
eprintln!("[{name}] fixture written to {}", path.display());
}
fn diff(
actual: &Value,
expected: &Value,
path: &str,
skip: &[String],
errors: &mut Vec<String>,
) {
match (actual, expected) {
(Value::Object(a), Value::Object(e)) => {
for (key, ev) in e {
if skip.iter().any(|s| s == key) {
continue;
}
let p = if path.is_empty() {
key.clone()
} else {
format!("{path}.{key}")
};
match a.get(key) {
Some(av) => diff(av, ev, &p, skip, errors),
None if ev.is_null() => {} None => errors.push(format!("{p}: missing from reassembled output")),
}
}
for key in a.keys() {
if skip.iter().any(|s| s == key) {
continue;
}
if !e.contains_key(key) {
let p = if path.is_empty() {
key.clone()
} else {
format!("{path}.{key}")
};
errors.push(format!("{p}: unexpected field in reassembled output"));
}
}
}
(Value::Array(a), Value::Array(e)) => {
if a.len() != e.len() {
errors.push(format!(
"{path}: array length {}, expected {}",
a.len(),
e.len()
));
return;
}
for (i, (av, ev)) in a.iter().zip(e).enumerate() {
diff(av, ev, &format!("{path}[{i}]"), skip, errors);
}
}
_ => {
if actual != expected {
if path.ends_with(".arguments") {
if let (Some(a), Some(e)) = (actual.as_str(), expected.as_str()) {
let ap: Result<Value, _> = serde_json::from_str(a);
let ep: Result<Value, _> = serde_json::from_str(e);
if let (Ok(ap), Ok(ep)) = (ap, ep) {
if ap == ep {
return;
}
}
}
}
errors.push(format!("{path}: got {actual}, expected {expected}"));
}
}
}
}
fn assert_fixture(provider: &str, name: &str) {
ensure_fixtures();
let root = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
let cases: Value =
serde_json::from_str(&std::fs::read_to_string(root.join("test_cases.json")).unwrap())
.unwrap();
let skip: Vec<String> = cases[name]["allowed_mismatches"]
.as_array()
.map(|a| a.iter().map(|v| v.as_str().unwrap().to_string()).collect())
.unwrap_or_default();
let path = root
.join("fixtures")
.join(provider)
.join(format!("{name}.json"));
let content = std::fs::read_to_string(&path)
.unwrap_or_else(|e| panic!("missing fixture {}: {e}", path.display()));
let fixture: Value = serde_json::from_str(&content).unwrap();
let events: Vec<eventsource_stream::Event> = fixture["chunks"]
.as_array()
.unwrap()
.iter()
.map(|chunk| eventsource_stream::Event {
data: if chunk.is_string() {
chunk.as_str().unwrap().to_string()
} else {
chunk.to_string()
},
..Default::default()
})
.collect();
let actual: Value = serde_json::from_str(&reassemble(&events).unwrap()).unwrap();
let mut errors = vec![];
diff(&actual, &fixture["expected"], "", &skip, &mut errors);
if !errors.is_empty() {
panic!("fixture {provider}/{name}:\n{}", errors.join("\n"));
}
}
fn assert_responses_fixture(provider: &str, name: &str) {
ensure_fixtures();
let root = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
let cases: Value =
serde_json::from_str(&std::fs::read_to_string(root.join("test_cases.json")).unwrap())
.unwrap();
let skip: Vec<String> = cases[name]["allowed_mismatches"]
.as_array()
.map(|a| a.iter().map(|v| v.as_str().unwrap().to_string()).collect())
.unwrap_or_default();
let path = root
.join("fixtures")
.join(provider)
.join(format!("{name}.json"));
let content = std::fs::read_to_string(&path)
.unwrap_or_else(|e| panic!("missing fixture {}: {e}", path.display()));
let fixture: Value = serde_json::from_str(&content).unwrap();
let events: Vec<eventsource_stream::Event> = fixture["events"]
.as_array()
.unwrap()
.iter()
.map(|ev| eventsource_stream::Event {
event: ev["event_type"]
.as_str()
.unwrap_or_default()
.to_string(),
data: ev["data"].to_string(),
..Default::default()
})
.collect();
let actual: Value = serde_json::from_str(&reassemble(&events).unwrap()).unwrap();
let mut errors = vec![];
diff(&actual, &fixture["expected"], "", &skip, &mut errors);
if !errors.is_empty() {
panic!("fixture {provider}/{name}:\n{}", errors.join("\n"));
}
}
#[test]
fn all_fixtures() {
ensure_fixtures();
let root = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
let cases: Value =
serde_json::from_str(&std::fs::read_to_string(root.join("test_cases.json")).unwrap())
.unwrap();
let providers = fixture_providers();
assert!(
!providers.is_empty(),
"No fixture provider directories found under fixtures/"
);
for provider in &providers {
let provider_dir = root.join("fixtures").join(provider);
let mut ran = 0;
for (name, case) in cases.as_object().unwrap() {
let fixture_path = provider_dir.join(format!("{name}.json"));
if !fixture_path.exists() {
eprintln!("[skip] {provider}/{name}: fixture file not present");
continue;
}
let endpoint = case["endpoint"].as_str().unwrap();
eprintln!("[test] {provider}/{name}");
if endpoint.ends_with("/responses") {
assert_responses_fixture(provider, name);
} else {
assert_fixture(provider, name);
}
ran += 1;
}
assert!(ran > 0, "Provider {provider} has no fixture files");
}
}
#[test]
fn role_not_concatenated() {
let events: Vec<eventsource_stream::Event> = vec![
eventsource_stream::Event {
data: r#"{"id":"1","object":"chat.completion.chunk","choices":[{"index":0,"delta":{"role":"assistant","content":"Hello"}}]}"#.to_string(),
..Default::default()
},
eventsource_stream::Event {
data: r#"{"id":"1","object":"chat.completion.chunk","choices":[{"index":0,"delta":{"role":"assistant","content":" world"}}]}"#.to_string(),
..Default::default()
},
eventsource_stream::Event {
data: r#"{"id":"1","object":"chat.completion.chunk","choices":[{"index":0,"delta":{"role":"assistant","content":"!"},"finish_reason":"stop"}]}"#.to_string(),
..Default::default()
},
];
let result: Value = serde_json::from_str(&reassemble(&events).unwrap()).unwrap();
let message = &result["choices"][0]["message"];
assert_eq!(message["role"], "assistant");
assert_eq!(message["content"], "Hello world!");
}
}