Skip to main content

openai_reassembler/
lib.rs

1//! Reassemble OpenAI-compatible SSE streaming responses into non-streaming format.
2//!
3//! When an OpenAI-compatible API streams a response as Server-Sent Events (SSE),
4//! each event contains a partial "chunk" of the final response. This crate provides
5//! a function to merge those chunks into the equivalent non-streaming JSON response.
6//!
7//! # Supported formats
8//!
9//! - **Chat completions** (`/v1/chat/completions`): merges `choices[].delta` fields
10//!   into `choices[].message`, concatenating string values (e.g. `content`, `refusal`)
11//!   and assembling `tool_calls` by index. Other non-string delta fields use last-value-wins.
12//! - **Legacy completions** (`/v1/completions`): concatenates `choices[].text`.
13//! - **Responses API** (`/v1/responses`): extracts the full response from the
14//!   `response.completed` event.
15//! - **Multiple choices**: tracked independently by `index`.
16//! - **Usage**: taken from the final chunk.
17//!
18//! Format detection is automatic: if any event's `event` field (from
19//! `eventsource_stream::Event`) starts with `"response."`, the Responses API
20//! path is used; otherwise the completions path.
21
22use serde_json::{Map, Value};
23
24/// Reassemble OpenAI-compatible streaming chunks into a non-streaming response.
25///
26/// Auto-detects the stream format and dispatches accordingly:
27/// - **Responses API**: if any event's `event` field starts with `"response."`,
28///   extracts the full response from the `response.completed` event.
29/// - **Completions**: otherwise, merges `choices[].delta` / `choices[].text` chunks.
30///
31/// For the chat and legacy completions endpoints, top-level fields (`id`, `created`,
32/// `model`, etc.) are taken from the first chunk. In this completions path, the
33/// `object` field has the `.chunk` suffix stripped (e.g. `chat.completion.chunk`
34/// → `chat.completion`). Responses API objects are left unchanged.
35///
36/// Events with empty data or `[DONE]` are skipped.
37pub fn reassemble(events: &[eventsource_stream::Event]) -> anyhow::Result<String> {
38    let is_responses_api = events.iter().any(|e| e.event.starts_with("response."));
39    if is_responses_api {
40        return reassemble_responses(events);
41    }
42
43    let mut base: Option<Value> = None;
44    let mut choices: std::collections::BTreeMap<u64, Map<String, Value>> = Default::default();
45    let mut usage = Value::Null;
46
47    for event in events {
48        if event.data.is_empty() || event.data == "[DONE]" {
49            continue;
50        }
51        let chunk: Value = serde_json::from_str(&event.data)
52            .map_err(|e| anyhow::anyhow!("Invalid chunk JSON: {}", e))?;
53
54        if base.is_none() {
55            let mut b = chunk.clone();
56            if let Some(obj) = b["object"].as_str() {
57                b["object"] = Value::String(obj.replace(".chunk", ""));
58            }
59            if let Some(m) = b.as_object_mut() {
60                m.remove("choices");
61                m.remove("usage");
62            }
63            base = Some(b);
64        }
65
66        if !chunk["usage"].is_null() {
67            usage = chunk["usage"].clone();
68        }
69
70        if let Some(chunk_choices) = chunk["choices"].as_array() {
71            for choice in chunk_choices {
72                let index = choice["index"].as_u64().unwrap_or(0);
73                let merged = choices.entry(index).or_default();
74
75                if !choice["finish_reason"].is_null() {
76                    merged.insert("finish_reason".to_string(), choice["finish_reason"].clone());
77                }
78
79                // Legacy completions: concatenate "text"
80                if let Some(text) = choice["text"].as_str() {
81                    let existing = merged
82                        .entry("text".to_string())
83                        .or_insert(Value::String(String::new()));
84                    if let Value::String(s) = existing {
85                        s.push_str(text);
86                    }
87                }
88
89                // Chat completions: merge "delta" into "message"
90                if let Some(delta) = choice["delta"].as_object() {
91                    let message = merged
92                        .entry("message".to_string())
93                        .or_insert(Value::Object(Map::new()));
94                    if let Value::Object(msg) = message {
95                        for (key, value) in delta {
96                            if value.is_null() {
97                                continue;
98                            }
99                            match key.as_str() {
100                                "tool_calls" => merge_tool_calls(msg, value),
101                                _ => merge_delta_field(msg, key, value),
102                            }
103                        }
104                    }
105                }
106            }
107        }
108    }
109
110    let mut response = base.unwrap_or(Value::Object(Map::new()));
111    let assembled_choices: Vec<Value> = choices
112        .into_iter()
113        .map(|(index, mut fields)| {
114            fields.insert("index".to_string(), Value::Number(index.into()));
115            if !fields.contains_key("finish_reason") {
116                fields.insert("finish_reason".to_string(), Value::Null);
117            }
118            Value::Object(fields)
119        })
120        .collect();
121    response["choices"] = Value::Array(assembled_choices);
122    response["usage"] = usage;
123
124    Ok(response.to_string())
125}
126
127/// Reassemble a Responses API SSE stream into a non-streaming response.
128///
129/// The Responses API emits typed events (`response.created`, `response.output_text.delta`,
130/// etc.). The final `response.completed` event contains the full response object under
131/// the `"response"` key. This function finds that event and extracts the response.
132fn reassemble_responses(events: &[eventsource_stream::Event]) -> anyhow::Result<String> {
133    for event in events.iter().rev() {
134        if event.event == "response.completed" {
135            let parsed: Value = serde_json::from_str(&event.data)
136                .map_err(|e| anyhow::anyhow!("Invalid response.completed JSON: {}", e))?;
137            if let Some(response) = parsed.get("response") {
138                return serde_json::to_string(response).map_err(Into::into);
139            }
140            anyhow::bail!(
141                "response.completed event JSON does not contain top-level \"response\" field"
142            );
143        }
144    }
145    anyhow::bail!("No response.completed event found in Responses API SSE stream")
146}
147
148/// Merge streamed tool_calls deltas into the accumulated message.
149///
150/// Tool calls arrive as an array of deltas, each with an `index` field indicating
151/// which tool call slot they belong to. `id` and `type` are set once; `function.name`
152/// and `function.arguments` are concatenated across chunks.
153fn merge_tool_calls(msg: &mut Map<String, Value>, value: &Value) {
154    let Some(arr) = value.as_array() else { return };
155    let tc_list = msg
156        .entry("tool_calls".to_string())
157        .or_insert(Value::Array(vec![]));
158    let Value::Array(existing) = tc_list else {
159        return;
160    };
161
162    for tc_delta in arr {
163        let idx = tc_delta["index"].as_u64().unwrap_or(0) as usize;
164        while existing.len() <= idx {
165            existing.push(Value::Object(Map::new()));
166        }
167        let slot = existing[idx].as_object_mut().unwrap();
168
169        // Set id and type (arrive once, on the first delta for this tool call)
170        for field in ["id", "type"] {
171            if let Some(v) = tc_delta.get(field) {
172                if !v.is_null() {
173                    slot.insert(field.to_string(), v.clone());
174                }
175            }
176        }
177
178        // Concatenate function name and arguments
179        if let Some(func) = tc_delta["function"].as_object() {
180            let f = slot
181                .entry("function".to_string())
182                .or_insert(Value::Object(Map::new()))
183                .as_object_mut()
184                .unwrap();
185            for field in ["name", "arguments"] {
186                if let Some(s) = func.get(field).and_then(|v| v.as_str()) {
187                    let existing = f
188                        .entry(field.to_string())
189                        .or_insert(Value::String(String::new()));
190                    if let Value::String(es) = existing {
191                        es.push_str(s);
192                    }
193                }
194            }
195        }
196    }
197}
198
199/// Merge a single delta field into the accumulated message.
200///
201/// String fields (content, refusal, etc.) are concatenated.
202/// The `role` field uses last-value-wins (providers may send it on every chunk).
203/// Non-string fields use last-value-wins.
204fn merge_delta_field(msg: &mut Map<String, Value>, key: &str, value: &Value) {
205    if key == "role" {
206        msg.insert(key.to_string(), value.clone());
207    } else if let Some(s) = value.as_str() {
208        let existing = msg
209            .entry(key.to_string())
210            .or_insert(Value::String(String::new()));
211        if let Value::String(existing_str) = existing {
212            existing_str.push_str(s);
213        }
214    } else {
215        msg.insert(key.to_string(), value.clone());
216    }
217}
218
219#[cfg(test)]
220mod tests {
221    use super::*;
222    use std::path::PathBuf;
223    use std::sync::Once;
224
225    static GENERATE: Once = Once::new();
226
227    /// If BASE_URL, MODEL, and FIXTURE_NAME are set, generate fixtures for that
228    /// provider once before tests run. Fixtures are written to
229    /// `fixtures/{FIXTURE_NAME}/`.
230    fn ensure_fixtures() {
231        GENERATE.call_once(|| {
232            let (Ok(base_url), Ok(model), Ok(fixture_name)) = (
233                std::env::var("BASE_URL"),
234                std::env::var("MODEL"),
235                std::env::var("FIXTURE_NAME"),
236            ) else {
237                return;
238            };
239            let api_key = std::env::var("API_KEY").unwrap_or_else(|_| "none".to_string());
240            let root = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
241            let fixtures_dir = root.join("fixtures").join(&fixture_name);
242            std::fs::create_dir_all(&fixtures_dir).unwrap();
243
244            let cases: Value = serde_json::from_str(
245                &std::fs::read_to_string(root.join("test_cases.json")).unwrap(),
246            )
247            .unwrap();
248
249            let rt = tokio::runtime::Runtime::new().unwrap();
250            let client = reqwest::Client::new();
251
252            for (name, case) in cases.as_object().unwrap() {
253                let endpoint = case["endpoint"].as_str().unwrap();
254                if endpoint.ends_with("/responses") {
255                    rt.block_on(record_responses_fixture(
256                        &client,
257                        &base_url,
258                        &api_key,
259                        &model,
260                        name,
261                        case,
262                        &fixtures_dir,
263                    ));
264                } else {
265                    rt.block_on(record_fixture(
266                        &client,
267                        &base_url,
268                        &api_key,
269                        &model,
270                        name,
271                        case,
272                        &fixtures_dir,
273                    ));
274                }
275            }
276        });
277    }
278
279    /// Discover all fixture provider directories under `fixtures/`.
280    fn fixture_providers() -> Vec<String> {
281        let root = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
282        let fixtures_dir = root.join("fixtures");
283        let mut providers: Vec<String> = std::fs::read_dir(&fixtures_dir)
284            .unwrap()
285            .filter_map(|entry| {
286                let entry = entry.ok()?;
287                if entry.file_type().ok()?.is_dir() {
288                    Some(entry.file_name().to_string_lossy().to_string())
289                } else {
290                    None
291                }
292            })
293            .collect();
294        providers.sort();
295        providers
296    }
297
298    async fn record_fixture(
299        client: &reqwest::Client,
300        base_url: &str,
301        api_key: &str,
302        model: &str,
303        name: &str,
304        case: &Value,
305        fixtures_dir: &PathBuf,
306    ) {
307        let endpoint = case["endpoint"].as_str().unwrap();
308        let url = format!("{base_url}{endpoint}");
309        let mut body = case["body"].as_object().unwrap().clone();
310        body.insert("model".to_string(), Value::String(model.to_string()));
311        body.insert("temperature".to_string(), Value::Number(0.into()));
312        body.insert("seed".to_string(), Value::Number(42.into()));
313
314        // Non-streaming
315        let mut non_stream_body = body.clone();
316        non_stream_body.insert("stream".to_string(), Value::Bool(false));
317        eprintln!("[{name}] POST {url} (non-streaming)");
318        let expected: Value = client
319            .post(&url)
320            .bearer_auth(api_key)
321            .json(&non_stream_body)
322            .send()
323            .await
324            .unwrap_or_else(|e| panic!("{name}: non-streaming request failed: {e}"))
325            .json()
326            .await
327            .unwrap_or_else(|e| panic!("{name}: non-streaming parse failed: {e}"));
328        eprintln!("[{name}] non-streaming response received");
329
330        // Streaming
331        let mut stream_body = body.clone();
332        stream_body.insert("stream".to_string(), Value::Bool(true));
333        let mut stream_opts = serde_json::Map::new();
334        stream_opts.insert("include_usage".to_string(), Value::Bool(true));
335        stream_body.insert("stream_options".to_string(), Value::Object(stream_opts));
336
337        eprintln!("[{name}] POST {url} (streaming)");
338        let response_text = client
339            .post(&url)
340            .bearer_auth(api_key)
341            .json(&stream_body)
342            .send()
343            .await
344            .unwrap_or_else(|e| panic!("{name}: streaming request failed: {e}"))
345            .text()
346            .await
347            .unwrap_or_else(|e| panic!("{name}: streaming read failed: {e}"));
348
349        let mut chunks: Vec<Value> = vec![];
350        for line in response_text.lines() {
351            if let Some(data) = line.strip_prefix("data: ") {
352                if data == "[DONE]" {
353                    chunks.push(Value::String("[DONE]".to_string()));
354                } else if let Ok(parsed) = serde_json::from_str::<Value>(data) {
355                    chunks.push(parsed);
356                }
357            }
358        }
359
360        eprintln!("[{name}] streaming response: {} chunks", chunks.len());
361
362        let fixture = serde_json::json!({ "chunks": chunks, "expected": expected });
363        let path = fixtures_dir.join(format!("{name}.json"));
364        std::fs::write(
365            &path,
366            serde_json::to_string_pretty(&fixture).unwrap() + "\n",
367        )
368        .unwrap_or_else(|e| panic!("{name}: failed to write fixture: {e}"));
369        eprintln!("[{name}] fixture written to {}", path.display());
370    }
371
372    /// Record a fixture for the Responses API.
373    ///
374    /// Unlike completions, responses SSE events are typed (e.g. `event: response.created`)
375    /// and the non-streaming request omits `stream` entirely rather than setting it to false.
376    /// Usage is always included in `response.completed` without needing `stream_options`.
377    async fn record_responses_fixture(
378        client: &reqwest::Client,
379        base_url: &str,
380        api_key: &str,
381        model: &str,
382        name: &str,
383        case: &Value,
384        fixtures_dir: &PathBuf,
385    ) {
386        let endpoint = case["endpoint"].as_str().unwrap();
387        let url = format!("{base_url}{endpoint}");
388        let mut body = case["body"].as_object().unwrap().clone();
389        body.insert("model".to_string(), Value::String(model.to_string()));
390        body.insert("temperature".to_string(), Value::Number(0.into()));
391        body.insert("seed".to_string(), Value::Number(42.into()));
392
393        // Non-streaming (no stream field at all for responses API)
394        eprintln!("[{name}] POST {url} (non-streaming)");
395        let expected: Value = client
396            .post(&url)
397            .bearer_auth(api_key)
398            .json(&body)
399            .send()
400            .await
401            .unwrap_or_else(|e| panic!("{name}: non-streaming request failed: {e}"))
402            .json()
403            .await
404            .unwrap_or_else(|e| panic!("{name}: non-streaming parse failed: {e}"));
405        eprintln!("[{name}] non-streaming response received");
406
407        // Streaming
408        body.insert("stream".to_string(), Value::Bool(true));
409
410        eprintln!("[{name}] POST {url} (streaming)");
411        let response_text = client
412            .post(&url)
413            .bearer_auth(api_key)
414            .json(&body)
415            .send()
416            .await
417            .unwrap_or_else(|e| panic!("{name}: streaming request failed: {e}"))
418            .text()
419            .await
420            .unwrap_or_else(|e| panic!("{name}: streaming read failed: {e}"));
421
422        // Parse SSE events preserving event types (spec-compliant: accumulate
423        // data lines until a blank line delimits the event).
424        let mut events: Vec<Value> = vec![];
425        let mut current_event_type: Option<String> = None;
426        let mut current_data_lines: Vec<String> = Vec::new();
427
428        for raw_line in response_text.lines() {
429            let line = raw_line.trim_end_matches('\r');
430            if line.is_empty() {
431                if !current_data_lines.is_empty() {
432                    let data_str = current_data_lines.join("\n");
433                    if data_str != "[DONE]" {
434                        if let Ok(parsed) = serde_json::from_str::<Value>(&data_str) {
435                            let event_type = current_event_type.clone().unwrap_or_default();
436                            events.push(
437                                serde_json::json!({ "event_type": event_type, "data": parsed }),
438                            );
439                        }
440                    }
441                }
442                current_event_type = None;
443                current_data_lines.clear();
444            } else if let Some(event_type) = line
445                .strip_prefix("event: ")
446                .or_else(|| line.strip_prefix("event:"))
447            {
448                current_event_type = Some(event_type.to_string());
449            } else if let Some(data) = line
450                .strip_prefix("data: ")
451                .or_else(|| line.strip_prefix("data:"))
452            {
453                current_data_lines.push(data.to_string());
454            }
455        }
456
457        // Finalize any event not terminated by a trailing blank line
458        if !current_data_lines.is_empty() {
459            let data_str = current_data_lines.join("\n");
460            if data_str != "[DONE]" {
461                if let Ok(parsed) = serde_json::from_str::<Value>(&data_str) {
462                    let event_type = current_event_type.clone().unwrap_or_default();
463                    events.push(
464                        serde_json::json!({ "event_type": event_type, "data": parsed }),
465                    );
466                }
467            }
468        }
469
470        eprintln!("[{name}] streaming response: {} events", events.len());
471
472        let fixture = serde_json::json!({ "events": events, "expected": expected });
473        let path = fixtures_dir.join(format!("{name}.json"));
474        std::fs::write(
475            &path,
476            serde_json::to_string_pretty(&fixture).unwrap() + "\n",
477        )
478        .unwrap_or_else(|e| panic!("{name}: failed to write fixture: {e}"));
479        eprintln!("[{name}] fixture written to {}", path.display());
480    }
481
482    /// Recursively compare two JSON values, collecting mismatches.
483    /// Fields in `skip` are skipped at any nesting depth.
484    fn diff(
485        actual: &Value,
486        expected: &Value,
487        path: &str,
488        skip: &[String],
489        errors: &mut Vec<String>,
490    ) {
491        match (actual, expected) {
492            (Value::Object(a), Value::Object(e)) => {
493                for (key, ev) in e {
494                    if skip.iter().any(|s| s == key) {
495                        continue;
496                    }
497                    let p = if path.is_empty() {
498                        key.clone()
499                    } else {
500                        format!("{path}.{key}")
501                    };
502                    match a.get(key) {
503                        Some(av) => diff(av, ev, &p, skip, errors),
504                        None if ev.is_null() => {} // missing field == explicit null
505                        None => errors.push(format!("{p}: missing from reassembled output")),
506                    }
507                }
508                for key in a.keys() {
509                    if skip.iter().any(|s| s == key) {
510                        continue;
511                    }
512                    if !e.contains_key(key) {
513                        let p = if path.is_empty() {
514                            key.clone()
515                        } else {
516                            format!("{path}.{key}")
517                        };
518                        errors.push(format!("{p}: unexpected field in reassembled output"));
519                    }
520                }
521            }
522            (Value::Array(a), Value::Array(e)) => {
523                if a.len() != e.len() {
524                    errors.push(format!(
525                        "{path}: array length {}, expected {}",
526                        a.len(),
527                        e.len()
528                    ));
529                    return;
530                }
531                for (i, (av, ev)) in a.iter().zip(e).enumerate() {
532                    diff(av, ev, &format!("{path}[{i}]"), skip, errors);
533                }
534            }
535            _ => {
536                if actual != expected {
537                    // Tool call arguments: compare as parsed JSON (whitespace may differ)
538                    if path.ends_with(".arguments") {
539                        if let (Some(a), Some(e)) = (actual.as_str(), expected.as_str()) {
540                            let ap: Result<Value, _> = serde_json::from_str(a);
541                            let ep: Result<Value, _> = serde_json::from_str(e);
542                            if let (Ok(ap), Ok(ep)) = (ap, ep) {
543                                if ap == ep {
544                                    return;
545                                }
546                            }
547                        }
548                    }
549                    errors.push(format!("{path}: got {actual}, expected {expected}"));
550                }
551            }
552        }
553    }
554
555    fn assert_fixture(provider: &str, name: &str) {
556        ensure_fixtures();
557        let root = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
558
559        // Load allowed_mismatches from test_cases.json
560        let cases: Value =
561            serde_json::from_str(&std::fs::read_to_string(root.join("test_cases.json")).unwrap())
562                .unwrap();
563        let skip: Vec<String> = cases[name]["allowed_mismatches"]
564            .as_array()
565            .map(|a| a.iter().map(|v| v.as_str().unwrap().to_string()).collect())
566            .unwrap_or_default();
567
568        let path = root
569            .join("fixtures")
570            .join(provider)
571            .join(format!("{name}.json"));
572        let content = std::fs::read_to_string(&path)
573            .unwrap_or_else(|e| panic!("missing fixture {}: {e}", path.display()));
574        let fixture: Value = serde_json::from_str(&content).unwrap();
575
576        let events: Vec<eventsource_stream::Event> = fixture["chunks"]
577            .as_array()
578            .unwrap()
579            .iter()
580            .map(|chunk| eventsource_stream::Event {
581                data: if chunk.is_string() {
582                    chunk.as_str().unwrap().to_string()
583                } else {
584                    chunk.to_string()
585                },
586                ..Default::default()
587            })
588            .collect();
589
590        let actual: Value = serde_json::from_str(&reassemble(&events).unwrap()).unwrap();
591
592        let mut errors = vec![];
593        diff(&actual, &fixture["expected"], "", &skip, &mut errors);
594        if !errors.is_empty() {
595            panic!("fixture {provider}/{name}:\n{}", errors.join("\n"));
596        }
597    }
598
599    /// Load a Responses API fixture and verify reassembly matches the expected response.
600    ///
601    /// Responses fixtures store events as `{ "event_type": ..., "data": ... }` objects
602    /// under the `"events"` key (not `"chunks"`).
603    fn assert_responses_fixture(provider: &str, name: &str) {
604        ensure_fixtures();
605        let root = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
606
607        let cases: Value =
608            serde_json::from_str(&std::fs::read_to_string(root.join("test_cases.json")).unwrap())
609                .unwrap();
610        let skip: Vec<String> = cases[name]["allowed_mismatches"]
611            .as_array()
612            .map(|a| a.iter().map(|v| v.as_str().unwrap().to_string()).collect())
613            .unwrap_or_default();
614
615        let path = root
616            .join("fixtures")
617            .join(provider)
618            .join(format!("{name}.json"));
619        let content = std::fs::read_to_string(&path)
620            .unwrap_or_else(|e| panic!("missing fixture {}: {e}", path.display()));
621        let fixture: Value = serde_json::from_str(&content).unwrap();
622
623        let events: Vec<eventsource_stream::Event> = fixture["events"]
624            .as_array()
625            .unwrap()
626            .iter()
627            .map(|ev| eventsource_stream::Event {
628                event: ev["event_type"]
629                    .as_str()
630                    .unwrap_or_default()
631                    .to_string(),
632                data: ev["data"].to_string(),
633                ..Default::default()
634            })
635            .collect();
636
637        let actual: Value = serde_json::from_str(&reassemble(&events).unwrap()).unwrap();
638
639        let mut errors = vec![];
640        diff(&actual, &fixture["expected"], "", &skip, &mut errors);
641        if !errors.is_empty() {
642            panic!("fixture {provider}/{name}:\n{}", errors.join("\n"));
643        }
644    }
645
646    /// Dynamically test all fixtures across all providers.
647    ///
648    /// Iterates over each subdirectory in `fixtures/` (each is a provider like
649    /// "vllm" or "dynamo"), and for each fixture file found, runs the appropriate
650    /// assertion based on whether it's a responses or completions fixture.
651    #[test]
652    fn all_fixtures() {
653        ensure_fixtures();
654        let root = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
655        let cases: Value =
656            serde_json::from_str(&std::fs::read_to_string(root.join("test_cases.json")).unwrap())
657                .unwrap();
658
659        let providers = fixture_providers();
660        assert!(
661            !providers.is_empty(),
662            "No fixture provider directories found under fixtures/"
663        );
664
665        for provider in &providers {
666            let provider_dir = root.join("fixtures").join(provider);
667            let mut ran = 0;
668            for (name, case) in cases.as_object().unwrap() {
669                let fixture_path = provider_dir.join(format!("{name}.json"));
670                if !fixture_path.exists() {
671                    eprintln!("[skip] {provider}/{name}: fixture file not present");
672                    continue;
673                }
674
675                let endpoint = case["endpoint"].as_str().unwrap();
676                eprintln!("[test] {provider}/{name}");
677                if endpoint.ends_with("/responses") {
678                    assert_responses_fixture(provider, name);
679                } else {
680                    assert_fixture(provider, name);
681                }
682                ran += 1;
683            }
684            assert!(ran > 0, "Provider {provider} has no fixture files");
685        }
686    }
687
688    /// Verify that `role` sent on every chunk is not concatenated (Dynamo-style streams).
689    #[test]
690    fn role_not_concatenated() {
691        let events: Vec<eventsource_stream::Event> = vec![
692            eventsource_stream::Event {
693                data: r#"{"id":"1","object":"chat.completion.chunk","choices":[{"index":0,"delta":{"role":"assistant","content":"Hello"}}]}"#.to_string(),
694                ..Default::default()
695            },
696            eventsource_stream::Event {
697                data: r#"{"id":"1","object":"chat.completion.chunk","choices":[{"index":0,"delta":{"role":"assistant","content":" world"}}]}"#.to_string(),
698                ..Default::default()
699            },
700            eventsource_stream::Event {
701                data: r#"{"id":"1","object":"chat.completion.chunk","choices":[{"index":0,"delta":{"role":"assistant","content":"!"},"finish_reason":"stop"}]}"#.to_string(),
702                ..Default::default()
703            },
704        ];
705
706        let result: Value = serde_json::from_str(&reassemble(&events).unwrap()).unwrap();
707        let message = &result["choices"][0]["message"];
708        assert_eq!(message["role"], "assistant");
709        assert_eq!(message["content"], "Hello world!");
710    }
711}