1use serde_json::{Map, Value};
23
24pub fn reassemble(events: &[eventsource_stream::Event]) -> anyhow::Result<String> {
38 let is_responses_api = events.iter().any(|e| e.event.starts_with("response."));
39 if is_responses_api {
40 return reassemble_responses(events);
41 }
42
43 let mut base: Option<Value> = None;
44 let mut choices: std::collections::BTreeMap<u64, Map<String, Value>> = Default::default();
45 let mut usage = Value::Null;
46
47 for event in events {
48 if event.data.is_empty() || event.data == "[DONE]" {
49 continue;
50 }
51 let chunk: Value = serde_json::from_str(&event.data)
52 .map_err(|e| anyhow::anyhow!("Invalid chunk JSON: {}", e))?;
53
54 if base.is_none() {
55 let mut b = chunk.clone();
56 if let Some(obj) = b["object"].as_str() {
57 b["object"] = Value::String(obj.replace(".chunk", ""));
58 }
59 if let Some(m) = b.as_object_mut() {
60 m.remove("choices");
61 m.remove("usage");
62 }
63 base = Some(b);
64 }
65
66 if !chunk["usage"].is_null() {
67 usage = chunk["usage"].clone();
68 }
69
70 if let Some(chunk_choices) = chunk["choices"].as_array() {
71 for choice in chunk_choices {
72 let index = choice["index"].as_u64().unwrap_or(0);
73 let merged = choices.entry(index).or_default();
74
75 if !choice["finish_reason"].is_null() {
76 merged.insert("finish_reason".to_string(), choice["finish_reason"].clone());
77 }
78
79 if let Some(text) = choice["text"].as_str() {
81 let existing = merged
82 .entry("text".to_string())
83 .or_insert(Value::String(String::new()));
84 if let Value::String(s) = existing {
85 s.push_str(text);
86 }
87 }
88
89 if let Some(delta) = choice["delta"].as_object() {
91 let message = merged
92 .entry("message".to_string())
93 .or_insert(Value::Object(Map::new()));
94 if let Value::Object(msg) = message {
95 for (key, value) in delta {
96 if value.is_null() {
97 continue;
98 }
99 match key.as_str() {
100 "tool_calls" => merge_tool_calls(msg, value),
101 _ => merge_delta_field(msg, key, value),
102 }
103 }
104 }
105 }
106 }
107 }
108 }
109
110 let mut response = base.unwrap_or(Value::Object(Map::new()));
111 let assembled_choices: Vec<Value> = choices
112 .into_iter()
113 .map(|(index, mut fields)| {
114 fields.insert("index".to_string(), Value::Number(index.into()));
115 if !fields.contains_key("finish_reason") {
116 fields.insert("finish_reason".to_string(), Value::Null);
117 }
118 Value::Object(fields)
119 })
120 .collect();
121 response["choices"] = Value::Array(assembled_choices);
122 response["usage"] = usage;
123
124 Ok(response.to_string())
125}
126
127fn reassemble_responses(events: &[eventsource_stream::Event]) -> anyhow::Result<String> {
133 for event in events.iter().rev() {
134 if event.event == "response.completed" {
135 let parsed: Value = serde_json::from_str(&event.data)
136 .map_err(|e| anyhow::anyhow!("Invalid response.completed JSON: {}", e))?;
137 if let Some(response) = parsed.get("response") {
138 return serde_json::to_string(response).map_err(Into::into);
139 }
140 anyhow::bail!(
141 "response.completed event JSON does not contain top-level \"response\" field"
142 );
143 }
144 }
145 anyhow::bail!("No response.completed event found in Responses API SSE stream")
146}
147
148fn merge_tool_calls(msg: &mut Map<String, Value>, value: &Value) {
154 let Some(arr) = value.as_array() else { return };
155 let tc_list = msg
156 .entry("tool_calls".to_string())
157 .or_insert(Value::Array(vec![]));
158 let Value::Array(existing) = tc_list else {
159 return;
160 };
161
162 for tc_delta in arr {
163 let idx = tc_delta["index"].as_u64().unwrap_or(0) as usize;
164 while existing.len() <= idx {
165 existing.push(Value::Object(Map::new()));
166 }
167 let slot = existing[idx].as_object_mut().unwrap();
168
169 for field in ["id", "type"] {
171 if let Some(v) = tc_delta.get(field) {
172 if !v.is_null() {
173 slot.insert(field.to_string(), v.clone());
174 }
175 }
176 }
177
178 if let Some(func) = tc_delta["function"].as_object() {
180 let f = slot
181 .entry("function".to_string())
182 .or_insert(Value::Object(Map::new()))
183 .as_object_mut()
184 .unwrap();
185 for field in ["name", "arguments"] {
186 if let Some(s) = func.get(field).and_then(|v| v.as_str()) {
187 let existing = f
188 .entry(field.to_string())
189 .or_insert(Value::String(String::new()));
190 if let Value::String(es) = existing {
191 es.push_str(s);
192 }
193 }
194 }
195 }
196 }
197}
198
199fn merge_delta_field(msg: &mut Map<String, Value>, key: &str, value: &Value) {
205 if key == "role" {
206 msg.insert(key.to_string(), value.clone());
207 } else if let Some(s) = value.as_str() {
208 let existing = msg
209 .entry(key.to_string())
210 .or_insert(Value::String(String::new()));
211 if let Value::String(existing_str) = existing {
212 existing_str.push_str(s);
213 }
214 } else {
215 msg.insert(key.to_string(), value.clone());
216 }
217}
218
219#[cfg(test)]
220mod tests {
221 use super::*;
222 use std::path::PathBuf;
223 use std::sync::Once;
224
225 static GENERATE: Once = Once::new();
226
227 fn ensure_fixtures() {
231 GENERATE.call_once(|| {
232 let (Ok(base_url), Ok(model), Ok(fixture_name)) = (
233 std::env::var("BASE_URL"),
234 std::env::var("MODEL"),
235 std::env::var("FIXTURE_NAME"),
236 ) else {
237 return;
238 };
239 let api_key = std::env::var("API_KEY").unwrap_or_else(|_| "none".to_string());
240 let root = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
241 let fixtures_dir = root.join("fixtures").join(&fixture_name);
242 std::fs::create_dir_all(&fixtures_dir).unwrap();
243
244 let cases: Value = serde_json::from_str(
245 &std::fs::read_to_string(root.join("test_cases.json")).unwrap(),
246 )
247 .unwrap();
248
249 let rt = tokio::runtime::Runtime::new().unwrap();
250 let client = reqwest::Client::new();
251
252 for (name, case) in cases.as_object().unwrap() {
253 let endpoint = case["endpoint"].as_str().unwrap();
254 if endpoint.ends_with("/responses") {
255 rt.block_on(record_responses_fixture(
256 &client,
257 &base_url,
258 &api_key,
259 &model,
260 name,
261 case,
262 &fixtures_dir,
263 ));
264 } else {
265 rt.block_on(record_fixture(
266 &client,
267 &base_url,
268 &api_key,
269 &model,
270 name,
271 case,
272 &fixtures_dir,
273 ));
274 }
275 }
276 });
277 }
278
279 fn fixture_providers() -> Vec<String> {
281 let root = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
282 let fixtures_dir = root.join("fixtures");
283 let mut providers: Vec<String> = std::fs::read_dir(&fixtures_dir)
284 .unwrap()
285 .filter_map(|entry| {
286 let entry = entry.ok()?;
287 if entry.file_type().ok()?.is_dir() {
288 Some(entry.file_name().to_string_lossy().to_string())
289 } else {
290 None
291 }
292 })
293 .collect();
294 providers.sort();
295 providers
296 }
297
298 async fn record_fixture(
299 client: &reqwest::Client,
300 base_url: &str,
301 api_key: &str,
302 model: &str,
303 name: &str,
304 case: &Value,
305 fixtures_dir: &PathBuf,
306 ) {
307 let endpoint = case["endpoint"].as_str().unwrap();
308 let url = format!("{base_url}{endpoint}");
309 let mut body = case["body"].as_object().unwrap().clone();
310 body.insert("model".to_string(), Value::String(model.to_string()));
311 body.insert("temperature".to_string(), Value::Number(0.into()));
312 body.insert("seed".to_string(), Value::Number(42.into()));
313
314 let mut non_stream_body = body.clone();
316 non_stream_body.insert("stream".to_string(), Value::Bool(false));
317 eprintln!("[{name}] POST {url} (non-streaming)");
318 let expected: Value = client
319 .post(&url)
320 .bearer_auth(api_key)
321 .json(&non_stream_body)
322 .send()
323 .await
324 .unwrap_or_else(|e| panic!("{name}: non-streaming request failed: {e}"))
325 .json()
326 .await
327 .unwrap_or_else(|e| panic!("{name}: non-streaming parse failed: {e}"));
328 eprintln!("[{name}] non-streaming response received");
329
330 let mut stream_body = body.clone();
332 stream_body.insert("stream".to_string(), Value::Bool(true));
333 let mut stream_opts = serde_json::Map::new();
334 stream_opts.insert("include_usage".to_string(), Value::Bool(true));
335 stream_body.insert("stream_options".to_string(), Value::Object(stream_opts));
336
337 eprintln!("[{name}] POST {url} (streaming)");
338 let response_text = client
339 .post(&url)
340 .bearer_auth(api_key)
341 .json(&stream_body)
342 .send()
343 .await
344 .unwrap_or_else(|e| panic!("{name}: streaming request failed: {e}"))
345 .text()
346 .await
347 .unwrap_or_else(|e| panic!("{name}: streaming read failed: {e}"));
348
349 let mut chunks: Vec<Value> = vec![];
350 for line in response_text.lines() {
351 if let Some(data) = line.strip_prefix("data: ") {
352 if data == "[DONE]" {
353 chunks.push(Value::String("[DONE]".to_string()));
354 } else if let Ok(parsed) = serde_json::from_str::<Value>(data) {
355 chunks.push(parsed);
356 }
357 }
358 }
359
360 eprintln!("[{name}] streaming response: {} chunks", chunks.len());
361
362 let fixture = serde_json::json!({ "chunks": chunks, "expected": expected });
363 let path = fixtures_dir.join(format!("{name}.json"));
364 std::fs::write(
365 &path,
366 serde_json::to_string_pretty(&fixture).unwrap() + "\n",
367 )
368 .unwrap_or_else(|e| panic!("{name}: failed to write fixture: {e}"));
369 eprintln!("[{name}] fixture written to {}", path.display());
370 }
371
372 async fn record_responses_fixture(
378 client: &reqwest::Client,
379 base_url: &str,
380 api_key: &str,
381 model: &str,
382 name: &str,
383 case: &Value,
384 fixtures_dir: &PathBuf,
385 ) {
386 let endpoint = case["endpoint"].as_str().unwrap();
387 let url = format!("{base_url}{endpoint}");
388 let mut body = case["body"].as_object().unwrap().clone();
389 body.insert("model".to_string(), Value::String(model.to_string()));
390 body.insert("temperature".to_string(), Value::Number(0.into()));
391 body.insert("seed".to_string(), Value::Number(42.into()));
392
393 eprintln!("[{name}] POST {url} (non-streaming)");
395 let expected: Value = client
396 .post(&url)
397 .bearer_auth(api_key)
398 .json(&body)
399 .send()
400 .await
401 .unwrap_or_else(|e| panic!("{name}: non-streaming request failed: {e}"))
402 .json()
403 .await
404 .unwrap_or_else(|e| panic!("{name}: non-streaming parse failed: {e}"));
405 eprintln!("[{name}] non-streaming response received");
406
407 body.insert("stream".to_string(), Value::Bool(true));
409
410 eprintln!("[{name}] POST {url} (streaming)");
411 let response_text = client
412 .post(&url)
413 .bearer_auth(api_key)
414 .json(&body)
415 .send()
416 .await
417 .unwrap_or_else(|e| panic!("{name}: streaming request failed: {e}"))
418 .text()
419 .await
420 .unwrap_or_else(|e| panic!("{name}: streaming read failed: {e}"));
421
422 let mut events: Vec<Value> = vec![];
425 let mut current_event_type: Option<String> = None;
426 let mut current_data_lines: Vec<String> = Vec::new();
427
428 for raw_line in response_text.lines() {
429 let line = raw_line.trim_end_matches('\r');
430 if line.is_empty() {
431 if !current_data_lines.is_empty() {
432 let data_str = current_data_lines.join("\n");
433 if data_str != "[DONE]" {
434 if let Ok(parsed) = serde_json::from_str::<Value>(&data_str) {
435 let event_type = current_event_type.clone().unwrap_or_default();
436 events.push(
437 serde_json::json!({ "event_type": event_type, "data": parsed }),
438 );
439 }
440 }
441 }
442 current_event_type = None;
443 current_data_lines.clear();
444 } else if let Some(event_type) = line
445 .strip_prefix("event: ")
446 .or_else(|| line.strip_prefix("event:"))
447 {
448 current_event_type = Some(event_type.to_string());
449 } else if let Some(data) = line
450 .strip_prefix("data: ")
451 .or_else(|| line.strip_prefix("data:"))
452 {
453 current_data_lines.push(data.to_string());
454 }
455 }
456
457 if !current_data_lines.is_empty() {
459 let data_str = current_data_lines.join("\n");
460 if data_str != "[DONE]" {
461 if let Ok(parsed) = serde_json::from_str::<Value>(&data_str) {
462 let event_type = current_event_type.clone().unwrap_or_default();
463 events.push(
464 serde_json::json!({ "event_type": event_type, "data": parsed }),
465 );
466 }
467 }
468 }
469
470 eprintln!("[{name}] streaming response: {} events", events.len());
471
472 let fixture = serde_json::json!({ "events": events, "expected": expected });
473 let path = fixtures_dir.join(format!("{name}.json"));
474 std::fs::write(
475 &path,
476 serde_json::to_string_pretty(&fixture).unwrap() + "\n",
477 )
478 .unwrap_or_else(|e| panic!("{name}: failed to write fixture: {e}"));
479 eprintln!("[{name}] fixture written to {}", path.display());
480 }
481
482 fn diff(
485 actual: &Value,
486 expected: &Value,
487 path: &str,
488 skip: &[String],
489 errors: &mut Vec<String>,
490 ) {
491 match (actual, expected) {
492 (Value::Object(a), Value::Object(e)) => {
493 for (key, ev) in e {
494 if skip.iter().any(|s| s == key) {
495 continue;
496 }
497 let p = if path.is_empty() {
498 key.clone()
499 } else {
500 format!("{path}.{key}")
501 };
502 match a.get(key) {
503 Some(av) => diff(av, ev, &p, skip, errors),
504 None if ev.is_null() => {} None => errors.push(format!("{p}: missing from reassembled output")),
506 }
507 }
508 for key in a.keys() {
509 if skip.iter().any(|s| s == key) {
510 continue;
511 }
512 if !e.contains_key(key) {
513 let p = if path.is_empty() {
514 key.clone()
515 } else {
516 format!("{path}.{key}")
517 };
518 errors.push(format!("{p}: unexpected field in reassembled output"));
519 }
520 }
521 }
522 (Value::Array(a), Value::Array(e)) => {
523 if a.len() != e.len() {
524 errors.push(format!(
525 "{path}: array length {}, expected {}",
526 a.len(),
527 e.len()
528 ));
529 return;
530 }
531 for (i, (av, ev)) in a.iter().zip(e).enumerate() {
532 diff(av, ev, &format!("{path}[{i}]"), skip, errors);
533 }
534 }
535 _ => {
536 if actual != expected {
537 if path.ends_with(".arguments") {
539 if let (Some(a), Some(e)) = (actual.as_str(), expected.as_str()) {
540 let ap: Result<Value, _> = serde_json::from_str(a);
541 let ep: Result<Value, _> = serde_json::from_str(e);
542 if let (Ok(ap), Ok(ep)) = (ap, ep) {
543 if ap == ep {
544 return;
545 }
546 }
547 }
548 }
549 errors.push(format!("{path}: got {actual}, expected {expected}"));
550 }
551 }
552 }
553 }
554
555 fn assert_fixture(provider: &str, name: &str) {
556 ensure_fixtures();
557 let root = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
558
559 let cases: Value =
561 serde_json::from_str(&std::fs::read_to_string(root.join("test_cases.json")).unwrap())
562 .unwrap();
563 let skip: Vec<String> = cases[name]["allowed_mismatches"]
564 .as_array()
565 .map(|a| a.iter().map(|v| v.as_str().unwrap().to_string()).collect())
566 .unwrap_or_default();
567
568 let path = root
569 .join("fixtures")
570 .join(provider)
571 .join(format!("{name}.json"));
572 let content = std::fs::read_to_string(&path)
573 .unwrap_or_else(|e| panic!("missing fixture {}: {e}", path.display()));
574 let fixture: Value = serde_json::from_str(&content).unwrap();
575
576 let events: Vec<eventsource_stream::Event> = fixture["chunks"]
577 .as_array()
578 .unwrap()
579 .iter()
580 .map(|chunk| eventsource_stream::Event {
581 data: if chunk.is_string() {
582 chunk.as_str().unwrap().to_string()
583 } else {
584 chunk.to_string()
585 },
586 ..Default::default()
587 })
588 .collect();
589
590 let actual: Value = serde_json::from_str(&reassemble(&events).unwrap()).unwrap();
591
592 let mut errors = vec![];
593 diff(&actual, &fixture["expected"], "", &skip, &mut errors);
594 if !errors.is_empty() {
595 panic!("fixture {provider}/{name}:\n{}", errors.join("\n"));
596 }
597 }
598
599 fn assert_responses_fixture(provider: &str, name: &str) {
604 ensure_fixtures();
605 let root = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
606
607 let cases: Value =
608 serde_json::from_str(&std::fs::read_to_string(root.join("test_cases.json")).unwrap())
609 .unwrap();
610 let skip: Vec<String> = cases[name]["allowed_mismatches"]
611 .as_array()
612 .map(|a| a.iter().map(|v| v.as_str().unwrap().to_string()).collect())
613 .unwrap_or_default();
614
615 let path = root
616 .join("fixtures")
617 .join(provider)
618 .join(format!("{name}.json"));
619 let content = std::fs::read_to_string(&path)
620 .unwrap_or_else(|e| panic!("missing fixture {}: {e}", path.display()));
621 let fixture: Value = serde_json::from_str(&content).unwrap();
622
623 let events: Vec<eventsource_stream::Event> = fixture["events"]
624 .as_array()
625 .unwrap()
626 .iter()
627 .map(|ev| eventsource_stream::Event {
628 event: ev["event_type"]
629 .as_str()
630 .unwrap_or_default()
631 .to_string(),
632 data: ev["data"].to_string(),
633 ..Default::default()
634 })
635 .collect();
636
637 let actual: Value = serde_json::from_str(&reassemble(&events).unwrap()).unwrap();
638
639 let mut errors = vec![];
640 diff(&actual, &fixture["expected"], "", &skip, &mut errors);
641 if !errors.is_empty() {
642 panic!("fixture {provider}/{name}:\n{}", errors.join("\n"));
643 }
644 }
645
646 #[test]
652 fn all_fixtures() {
653 ensure_fixtures();
654 let root = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
655 let cases: Value =
656 serde_json::from_str(&std::fs::read_to_string(root.join("test_cases.json")).unwrap())
657 .unwrap();
658
659 let providers = fixture_providers();
660 assert!(
661 !providers.is_empty(),
662 "No fixture provider directories found under fixtures/"
663 );
664
665 for provider in &providers {
666 let provider_dir = root.join("fixtures").join(provider);
667 let mut ran = 0;
668 for (name, case) in cases.as_object().unwrap() {
669 let fixture_path = provider_dir.join(format!("{name}.json"));
670 if !fixture_path.exists() {
671 eprintln!("[skip] {provider}/{name}: fixture file not present");
672 continue;
673 }
674
675 let endpoint = case["endpoint"].as_str().unwrap();
676 eprintln!("[test] {provider}/{name}");
677 if endpoint.ends_with("/responses") {
678 assert_responses_fixture(provider, name);
679 } else {
680 assert_fixture(provider, name);
681 }
682 ran += 1;
683 }
684 assert!(ran > 0, "Provider {provider} has no fixture files");
685 }
686 }
687
688 #[test]
690 fn role_not_concatenated() {
691 let events: Vec<eventsource_stream::Event> = vec![
692 eventsource_stream::Event {
693 data: r#"{"id":"1","object":"chat.completion.chunk","choices":[{"index":0,"delta":{"role":"assistant","content":"Hello"}}]}"#.to_string(),
694 ..Default::default()
695 },
696 eventsource_stream::Event {
697 data: r#"{"id":"1","object":"chat.completion.chunk","choices":[{"index":0,"delta":{"role":"assistant","content":" world"}}]}"#.to_string(),
698 ..Default::default()
699 },
700 eventsource_stream::Event {
701 data: r#"{"id":"1","object":"chat.completion.chunk","choices":[{"index":0,"delta":{"role":"assistant","content":"!"},"finish_reason":"stop"}]}"#.to_string(),
702 ..Default::default()
703 },
704 ];
705
706 let result: Value = serde_json::from_str(&reassemble(&events).unwrap()).unwrap();
707 let message = &result["choices"][0]["message"];
708 assert_eq!(message["role"], "assistant");
709 assert_eq!(message["content"], "Hello world!");
710 }
711}