Skip to main content

openai_core/stream/
responses.rs

1use std::collections::{BTreeMap, HashMap};
2use std::pin::Pin;
3use std::task::{Context, Poll};
4
5use futures_util::Stream;
6use serde::{Deserialize, Serialize};
7use serde_json::{Map, Value};
8
9use super::partial_json::parse_optional_json;
10use super::sse::SseStream;
11use super::value_helpers::{ensure_array_field, ensure_object, ensure_vec_len};
12use crate::error::Result;
13use crate::json_payload::JsonPayload;
14use crate::resources::Response;
15use crate::response_meta::ResponseMeta;
16
17/// 表示 Responses API 的流式包装器。
18#[derive(Debug)]
19pub struct ResponseStream {
20    inner: SseStream<Value>,
21    accumulator: ResponseAccumulator,
22}
23
24impl ResponseStream {
25    /// 创建新的 Responses 流。
26    pub fn new(inner: SseStream<Value>) -> Self {
27        Self {
28            inner,
29            accumulator: ResponseAccumulator::default(),
30        }
31    }
32
33    /// 获取当前聚合出的输出文本。
34    pub fn output_text(&self) -> &str {
35        &self.accumulator.output_text
36    }
37
38    /// 获取已聚合的函数调用参数。
39    pub fn function_arguments(&self) -> &HashMap<String, String> {
40        &self.accumulator.function_arguments
41    }
42
43    /// 获取截至目前聚合出的响应快照。
44    pub fn snapshot(&self) -> Option<Response> {
45        self.accumulator.snapshot()
46    }
47
48    /// 消费整个流并返回最终文本快照。
49    pub async fn into_output_text(mut self) -> Result<String> {
50        while let Some(event) = futures_util::StreamExt::next(&mut self).await {
51            event?;
52        }
53        Ok(self.accumulator.output_text)
54    }
55
56    /// 消费整个流并返回最终响应快照。
57    pub async fn final_response(mut self) -> Result<Option<Response>> {
58        while let Some(event) = futures_util::StreamExt::next(&mut self).await {
59            event?;
60        }
61        Ok(self.accumulator.into_response())
62    }
63
64    /// 返回底层响应元信息。
65    pub fn meta(&self) -> &ResponseMeta {
66        self.inner.meta()
67    }
68
69    /// 把原始事件流转换为带高层语义的运行时流。
70    pub fn events(self) -> ResponseEventStream {
71        ResponseEventStream::new(self)
72    }
73}
74
75impl Stream for ResponseStream {
76    type Item = Result<Value>;
77
78    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
79        let this = self.get_mut();
80        match Pin::new(&mut this.inner).poll_next(cx) {
81            Poll::Ready(Some(Ok(event))) => {
82                this.accumulator.apply(&event);
83                Poll::Ready(Some(Ok(event)))
84            }
85            other => other,
86        }
87    }
88}
89
90/// 表示输出文本增量事件。
91#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
92pub struct ResponseOutputTextEvent {
93    /// 原始事件类型。
94    pub event_type: String,
95    /// 输出项索引。
96    pub output_index: usize,
97    /// 内容索引。
98    pub content_index: usize,
99    /// 文本增量或最终文本。
100    pub text: String,
101    /// 当前累计文本。
102    pub snapshot: String,
103}
104
105/// 表示函数调用参数增量事件。
106#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
107pub struct ResponseFunctionCallArgumentsEvent {
108    /// 输出项索引。
109    pub output_index: usize,
110    /// 关联的 item ID。
111    pub item_id: Option<String>,
112    /// 参数增量。
113    pub delta: String,
114    /// 当前累计参数字符串。
115    pub snapshot: String,
116    /// 如果当前参数是合法 JSON,则提供解析结果。
117    pub parsed_arguments: Option<JsonPayload>,
118}
119
120/// 表示 Responses 流在运行时派生出的高层事件。
121#[derive(Debug, Clone, Serialize, Deserialize)]
122pub enum ResponseRuntimeEvent {
123    /// 未专门派生的原始事件。
124    Raw(JsonPayload),
125    /// 响应已创建。
126    ResponseCreated(Response),
127    /// 输出项已追加。
128    OutputItemAdded {
129        /// 输出项索引。
130        output_index: usize,
131        /// 新增输出项。
132        item: JsonPayload,
133        /// 当前响应快照。
134        snapshot: Response,
135    },
136    /// 输出内容片段已追加。
137    ContentPartAdded {
138        /// 输出项索引。
139        output_index: usize,
140        /// 内容索引。
141        content_index: usize,
142        /// 新增内容片段。
143        part: JsonPayload,
144        /// 当前响应快照。
145        snapshot: Response,
146    },
147    /// 输出文本增量。
148    OutputTextDelta(ResponseOutputTextEvent),
149    /// 输出文本完成。
150    OutputTextDone(ResponseOutputTextEvent),
151    /// 函数调用参数增量。
152    FunctionCallArgumentsDelta(ResponseFunctionCallArgumentsEvent),
153    /// 响应完成。
154    Completed(Response),
155}
156
157/// 表示带高层语义事件的 Responses 流。
158#[derive(Debug)]
159pub struct ResponseEventStream {
160    inner: ResponseStream,
161}
162
163impl ResponseEventStream {
164    fn new(inner: ResponseStream) -> Self {
165        Self { inner }
166    }
167
168    /// 返回当前累计文本。
169    pub fn output_text(&self) -> &str {
170        self.inner.output_text()
171    }
172
173    /// 返回当前聚合的函数参数。
174    pub fn function_arguments(&self) -> &HashMap<String, String> {
175        self.inner.function_arguments()
176    }
177
178    /// 返回当前响应快照。
179    pub fn snapshot(&self) -> Option<Response> {
180        self.inner.snapshot()
181    }
182
183    /// 返回底层响应元信息。
184    pub fn meta(&self) -> &ResponseMeta {
185        self.inner.meta()
186    }
187
188    /// 消费整个事件流并返回最终响应快照。
189    pub async fn final_response(mut self) -> Result<Option<Response>> {
190        while let Some(event) = futures_util::StreamExt::next(&mut self).await {
191            event?;
192        }
193        Ok(self.snapshot())
194    }
195}
196
197impl Stream for ResponseEventStream {
198    type Item = Result<ResponseRuntimeEvent>;
199
200    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
201        let this = self.get_mut();
202        match Pin::new(&mut this.inner).poll_next(cx) {
203            Poll::Ready(Some(Ok(event))) => {
204                let snapshot = this.inner.snapshot();
205                let output_text = this.inner.output_text().to_owned();
206                let function_arguments = this.inner.function_arguments().clone();
207                Poll::Ready(Some(Ok(derive_response_runtime_event(
208                    event,
209                    snapshot,
210                    &output_text,
211                    &function_arguments,
212                ))))
213            }
214            Poll::Ready(Some(Err(error))) => Poll::Ready(Some(Err(error))),
215            Poll::Ready(None) => Poll::Ready(None),
216            Poll::Pending => Poll::Pending,
217        }
218    }
219}
220
221fn derive_response_runtime_event(
222    event: Value,
223    snapshot: Option<Response>,
224    output_text: &str,
225    function_arguments: &HashMap<String, String>,
226) -> ResponseRuntimeEvent {
227    let event_type = event
228        .get("type")
229        .and_then(Value::as_str)
230        .unwrap_or_default()
231        .to_owned();
232
233    match event_type.as_str() {
234        "response.created" => snapshot
235            .map(ResponseRuntimeEvent::ResponseCreated)
236            .unwrap_or(ResponseRuntimeEvent::Raw(event.into())),
237        "response.output_item.added" => {
238            if let (Some(output_index), Some(item), Some(snapshot)) = (
239                event
240                    .get("output_index")
241                    .and_then(Value::as_u64)
242                    .map(|value| value as usize),
243                event.get("item").cloned(),
244                snapshot,
245            ) {
246                ResponseRuntimeEvent::OutputItemAdded {
247                    output_index,
248                    item: item.into(),
249                    snapshot,
250                }
251            } else {
252                ResponseRuntimeEvent::Raw(event.into())
253            }
254        }
255        "response.content_part.added" => {
256            if let (Some(output_index), Some(content_index), Some(part), Some(snapshot)) = (
257                event
258                    .get("output_index")
259                    .and_then(Value::as_u64)
260                    .map(|value| value as usize),
261                event
262                    .get("content_index")
263                    .and_then(Value::as_u64)
264                    .map(|value| value as usize),
265                event.get("part").cloned(),
266                snapshot,
267            ) {
268                ResponseRuntimeEvent::ContentPartAdded {
269                    output_index,
270                    content_index,
271                    part: part.into(),
272                    snapshot,
273                }
274            } else {
275                ResponseRuntimeEvent::Raw(event.into())
276            }
277        }
278        "response.output_text.delta" | "response.output_text.done" => {
279            let output_index = event
280                .get("output_index")
281                .and_then(Value::as_u64)
282                .map(|value| value as usize)
283                .unwrap_or_default();
284            let content_index = event
285                .get("content_index")
286                .and_then(Value::as_u64)
287                .map(|value| value as usize)
288                .unwrap_or_default();
289            let text = event
290                .get("delta")
291                .or_else(|| event.get("text"))
292                .and_then(Value::as_str)
293                .unwrap_or_default()
294                .to_owned();
295            let snapshot_text = snapshot
296                .as_ref()
297                .and_then(|response| {
298                    response_output_text_snapshot(response, output_index, content_index)
299                })
300                .filter(|snapshot_text| !snapshot_text.is_empty())
301                .unwrap_or_else(|| {
302                    if output_text.is_empty() {
303                        text.clone()
304                    } else {
305                        output_text.to_owned()
306                    }
307                });
308            let typed_event = ResponseOutputTextEvent {
309                event_type: event_type.clone(),
310                output_index,
311                content_index,
312                text,
313                snapshot: snapshot_text,
314            };
315            if event_type == "response.output_text.delta" {
316                ResponseRuntimeEvent::OutputTextDelta(typed_event)
317            } else {
318                ResponseRuntimeEvent::OutputTextDone(typed_event)
319            }
320        }
321        "response.function_call_arguments.delta" => {
322            let output_index = event
323                .get("output_index")
324                .and_then(Value::as_u64)
325                .map(|value| value as usize)
326                .unwrap_or_default();
327            let item_id = event
328                .get("item_id")
329                .or_else(|| event.get("call_id"))
330                .and_then(Value::as_str)
331                .map(str::to_owned);
332            let delta = event
333                .get("delta")
334                .and_then(Value::as_str)
335                .unwrap_or_default()
336                .to_owned();
337            let fallback_arguments = item_id
338                .as_deref()
339                .and_then(|key| function_arguments.get(key))
340                .cloned()
341                .or_else(|| function_arguments.get("default").cloned())
342                .unwrap_or_else(|| delta.clone());
343            let snapshot_arguments = snapshot
344                .as_ref()
345                .and_then(|response| response_function_arguments_snapshot(response, output_index))
346                .filter(|snapshot_arguments| !snapshot_arguments.is_empty())
347                .unwrap_or(fallback_arguments);
348            ResponseRuntimeEvent::FunctionCallArgumentsDelta(ResponseFunctionCallArgumentsEvent {
349                output_index,
350                parsed_arguments: parse_optional_json(&snapshot_arguments).map(JsonPayload::from),
351                item_id,
352                delta,
353                snapshot: snapshot_arguments,
354            })
355        }
356        "response.completed" => snapshot
357            .map(ResponseRuntimeEvent::Completed)
358            .unwrap_or(ResponseRuntimeEvent::Raw(event.into())),
359        _ => ResponseRuntimeEvent::Raw(event.into()),
360    }
361}
362
363fn response_output_text_snapshot(
364    response: &Response,
365    output_index: usize,
366    content_index: usize,
367) -> Option<String> {
368    let output = response.output.get(output_index)?;
369    if let Some(message) = output.as_message() {
370        return message
371            .content
372            .get(content_index)
373            .and_then(|item| item.text())
374            .map(str::to_owned);
375    }
376    if content_index == 0
377        && let Some(text) = output.output_text()
378    {
379        return Some(text.to_owned());
380    }
381    output
382        .as_raw()
383        .and_then(|value| value.get("content"))
384        .and_then(Value::as_array)
385        .and_then(|content| content.get(content_index))
386        .and_then(|item| item.get("text"))
387        .and_then(Value::as_str)
388        .map(str::to_owned)
389}
390
391fn response_function_arguments_snapshot(
392    response: &Response,
393    output_index: usize,
394) -> Option<String> {
395    response
396        .output
397        .get(output_index)
398        .and_then(|output| {
399            output
400                .as_function_call()
401                .map(|call| call.arguments.as_str())
402                .or_else(|| {
403                    output
404                        .as_raw()
405                        .and_then(|value| value.get("arguments"))
406                        .and_then(Value::as_str)
407                })
408        })
409        .map(str::to_owned)
410}
411
412#[derive(Debug, Default, Clone)]
413struct ResponseAccumulator {
414    response: Option<RawResponseSnapshot>,
415    output_text: String,
416    function_arguments: HashMap<String, String>,
417}
418
419impl ResponseAccumulator {
420    fn snapshot(&self) -> Option<Response> {
421        self.response
422            .as_ref()
423            .and_then(RawResponseSnapshot::clone_public_response)
424    }
425
426    fn into_response(self) -> Option<Response> {
427        self.response
428            .and_then(RawResponseSnapshot::into_public_response)
429    }
430
431    fn apply(&mut self, event: &Value) {
432        let Some(event_type) = event.get("type").and_then(Value::as_str) else {
433            return;
434        };
435
436        match event_type {
437            "response.created" => {
438                if let Some(response) = event.get("response") {
439                    self.response = serde_json::from_value(response.clone()).ok();
440                    self.sync_output_text_from_snapshot();
441                }
442            }
443            "response.output_item.added" => {
444                let Some(response) = &mut self.response else {
445                    return;
446                };
447                let Some(item) = event.get("item") else {
448                    return;
449                };
450                let index = event
451                    .get("output_index")
452                    .and_then(Value::as_u64)
453                    .map(|value| value as usize)
454                    .unwrap_or(response.output.len());
455                ensure_vec_len(&mut response.output, index + 1);
456                let existing = response.output[index].clone();
457                response.output[index] = merge_response_output_item(existing, item.clone());
458                self.sync_output_text_from_snapshot();
459            }
460            "response.content_part.added" => {
461                let Some(response) = &mut self.response else {
462                    return;
463                };
464                let Some(part) = event.get("part") else {
465                    return;
466                };
467                let output_index = event
468                    .get("output_index")
469                    .and_then(Value::as_u64)
470                    .map(|value| value as usize)
471                    .unwrap_or_default();
472                let content_index = event
473                    .get("content_index")
474                    .and_then(Value::as_u64)
475                    .map(|value| value as usize)
476                    .unwrap_or_default();
477                ensure_vec_len(&mut response.output, output_index + 1);
478                if response.output[output_index].is_null() {
479                    response.output[output_index] = Value::Object(Map::new());
480                }
481                let output = &mut response.output[output_index];
482                let content = ensure_array_field(output, "content");
483                ensure_vec_len(content, content_index + 1);
484                let existing = content[content_index].clone();
485                content[content_index] = merge_response_content_part(existing, part.clone());
486                self.sync_output_text_from_snapshot();
487            }
488            "response.output_text.delta" => {
489                if let Some(delta) = event.get("delta").and_then(Value::as_str) {
490                    self.output_text.push_str(delta);
491                }
492                if let Some(response) = &mut self.response {
493                    append_response_content_text(response, event, "text", "output_text");
494                }
495            }
496            "response.output_text.done" => {
497                if self.output_text.is_empty()
498                    && let Some(text) = event.get("text").and_then(Value::as_str)
499                {
500                    self.output_text = text.to_owned();
501                }
502                if let Some(response) = &mut self.response {
503                    set_response_content_text(response, event, "text", "output_text");
504                }
505            }
506            "response.function_call_arguments.delta" => {
507                let key = event
508                    .get("item_id")
509                    .and_then(Value::as_str)
510                    .or_else(|| event.get("call_id").and_then(Value::as_str))
511                    .unwrap_or("default");
512                let delta = event.get("delta").and_then(Value::as_str).unwrap_or("");
513                self.function_arguments
514                    .entry(key.to_owned())
515                    .and_modify(|value| value.push_str(delta))
516                    .or_insert_with(|| delta.to_owned());
517                if let Some(response) = &mut self.response {
518                    append_function_call_arguments(response, event, delta);
519                }
520            }
521            "response.reasoning_text.delta" => {
522                if let Some(response) = &mut self.response {
523                    append_response_content_text(response, event, "text", "reasoning_text");
524                    self.sync_output_text_from_snapshot();
525                }
526            }
527            "response.completed" => {
528                if let Some(response) = event.get("response") {
529                    self.response = serde_json::from_value(response.clone()).ok();
530                    self.sync_output_text_from_snapshot();
531                }
532            }
533            _ => {}
534        }
535    }
536
537    fn sync_output_text_from_snapshot(&mut self) {
538        if let Some(response) = &self.response
539            && let Some(text) = response.output_text()
540        {
541            self.output_text = text;
542        }
543    }
544}
545
546#[derive(Debug, Default, Clone, Serialize, Deserialize)]
547struct RawResponseSnapshot {
548    pub id: String,
549    pub created_at: Option<u64>,
550    #[serde(default)]
551    pub object: String,
552    pub model: Option<String>,
553    pub status: Option<String>,
554    pub error: Option<Value>,
555    pub incomplete_details: Option<Value>,
556    pub metadata: Option<BTreeMap<String, String>>,
557    #[serde(default)]
558    pub output: Vec<Value>,
559    pub usage: Option<Value>,
560    #[serde(flatten)]
561    pub extra: BTreeMap<String, Value>,
562}
563
564impl RawResponseSnapshot {
565    fn clone_public_response(&self) -> Option<Response> {
566        serde_json::to_value(self)
567            .ok()
568            .and_then(|value| serde_json::from_value(value).ok())
569    }
570
571    fn into_public_response(self) -> Option<Response> {
572        serde_json::to_value(self)
573            .ok()
574            .and_then(|value| serde_json::from_value(value).ok())
575    }
576
577    fn output_text(&self) -> Option<String> {
578        for item in &self.output {
579            if let Some(text) = item.get("text").and_then(Value::as_str) {
580                return Some(text.to_owned());
581            }
582            if let Some(content) = item.get("content").and_then(Value::as_array) {
583                for content_item in content {
584                    if let Some(text) = content_item.get("text").and_then(Value::as_str) {
585                        return Some(text.to_owned());
586                    }
587                }
588            }
589        }
590
591        self.extra
592            .get("output_text")
593            .and_then(Value::as_str)
594            .map(str::to_owned)
595    }
596}
597
598fn merge_response_output_item(existing: Value, incoming: Value) -> Value {
599    let (Some(existing_object), Some(mut incoming_object)) =
600        (existing.as_object(), incoming.as_object().cloned())
601    else {
602        return incoming;
603    };
604
605    if let Some(existing_arguments) = existing_object
606        .get("arguments")
607        .and_then(Value::as_str)
608        .filter(|value| !value.is_empty())
609    {
610        let incoming_arguments = incoming_object
611            .get("arguments")
612            .and_then(Value::as_str)
613            .unwrap_or("");
614        if incoming_arguments.is_empty() {
615            incoming_object.insert(
616                "arguments".into(),
617                Value::String(existing_arguments.to_owned()),
618            );
619        }
620    }
621
622    if let Some(existing_content) = existing_object
623        .get("content")
624        .and_then(Value::as_array)
625        .filter(|value| !value.is_empty())
626        .cloned()
627    {
628        let use_existing_content = incoming_object
629            .get("content")
630            .and_then(Value::as_array)
631            .is_none_or(Vec::is_empty);
632        if use_existing_content {
633            incoming_object.insert("content".into(), Value::Array(existing_content));
634        }
635    }
636
637    Value::Object(incoming_object)
638}
639
640fn merge_response_content_part(existing: Value, incoming: Value) -> Value {
641    let (Some(existing_object), Some(mut incoming_object)) =
642        (existing.as_object(), incoming.as_object().cloned())
643    else {
644        return incoming;
645    };
646
647    if let Some(existing_text) = existing_object
648        .get("text")
649        .and_then(Value::as_str)
650        .filter(|value| !value.is_empty())
651    {
652        let incoming_text = incoming_object
653            .get("text")
654            .and_then(Value::as_str)
655            .unwrap_or("");
656        if incoming_text.is_empty() {
657            incoming_object.insert("text".into(), Value::String(existing_text.to_owned()));
658        }
659    }
660
661    for key in ["output_text", "reasoning_text"] {
662        let Some(existing_text) = existing_object
663            .get(key)
664            .and_then(|value| value.get("text"))
665            .and_then(Value::as_str)
666            .filter(|value| !value.is_empty())
667        else {
668            continue;
669        };
670        let incoming_value = incoming_object
671            .entry(key.to_owned())
672            .or_insert_with(|| Value::Object(Map::new()));
673        let incoming_nested = ensure_object(incoming_value);
674        let incoming_text = incoming_nested
675            .get("text")
676            .and_then(Value::as_str)
677            .unwrap_or("");
678        if incoming_text.is_empty() {
679            incoming_nested.insert("text".into(), Value::String(existing_text.to_owned()));
680        }
681    }
682
683    Value::Object(incoming_object)
684}
685
686fn append_response_content_text(
687    response: &mut RawResponseSnapshot,
688    event: &Value,
689    field_name: &str,
690    default_type: &str,
691) {
692    let output_index = event
693        .get("output_index")
694        .and_then(Value::as_u64)
695        .map(|value| value as usize)
696        .unwrap_or_default();
697    let content_index = event
698        .get("content_index")
699        .and_then(Value::as_u64)
700        .map(|value| value as usize)
701        .unwrap_or_default();
702    let delta = event.get("delta").and_then(Value::as_str).unwrap_or("");
703
704    ensure_vec_len(&mut response.output, output_index + 1);
705    if response.output[output_index].is_null() {
706        response.output[output_index] = Value::Object(Map::new());
707    }
708    let output = &mut response.output[output_index];
709    let content = ensure_array_field(output, "content");
710    ensure_vec_len(content, content_index + 1);
711    if content[content_index].is_null() {
712        let mut content_map = Map::new();
713        content_map.insert("type".into(), Value::String(default_type.to_owned()));
714        content_map.insert(field_name.into(), Value::String(String::new()));
715        content[content_index] = Value::Object(content_map);
716    }
717
718    let slot = &mut content[content_index];
719    let slot_object = ensure_object(slot);
720    slot_object
721        .entry("type")
722        .or_insert_with(|| Value::String(default_type.to_owned()));
723    match field_name {
724        "text" => {
725            let text = slot_object
726                .entry("text")
727                .or_insert_with(|| Value::String(String::new()));
728            if let Some(existing) = text.as_str() {
729                *text = Value::String(format!("{existing}{delta}"));
730            } else {
731                *text = Value::String(delta.to_owned());
732            }
733        }
734        _ => {
735            let nested = slot_object
736                .entry(field_name)
737                .or_insert_with(|| Value::Object(Map::new()));
738            let nested_object = ensure_object(nested);
739            let text = nested_object
740                .entry("text")
741                .or_insert_with(|| Value::String(String::new()));
742            if let Some(existing) = text.as_str() {
743                *text = Value::String(format!("{existing}{delta}"));
744            } else {
745                *text = Value::String(delta.to_owned());
746            }
747        }
748    }
749}
750
751fn set_response_content_text(
752    response: &mut RawResponseSnapshot,
753    event: &Value,
754    field_name: &str,
755    default_type: &str,
756) {
757    let Some(text) = event.get("text").and_then(Value::as_str) else {
758        return;
759    };
760    let output_index = event
761        .get("output_index")
762        .and_then(Value::as_u64)
763        .map(|value| value as usize)
764        .unwrap_or_default();
765    let content_index = event
766        .get("content_index")
767        .and_then(Value::as_u64)
768        .map(|value| value as usize)
769        .unwrap_or_default();
770
771    ensure_vec_len(&mut response.output, output_index + 1);
772    if response.output[output_index].is_null() {
773        response.output[output_index] = Value::Object(Map::new());
774    }
775    let output = &mut response.output[output_index];
776    let content = ensure_array_field(output, "content");
777    ensure_vec_len(content, content_index + 1);
778    if content[content_index].is_null() {
779        let mut content_map = Map::new();
780        content_map.insert("type".into(), Value::String(default_type.to_owned()));
781        content[content_index] = Value::Object(content_map);
782    }
783
784    let slot = &mut content[content_index];
785    let slot_object = ensure_object(slot);
786    slot_object.insert("type".into(), Value::String(default_type.to_owned()));
787    match field_name {
788        "text" => {
789            slot_object.insert("text".into(), Value::String(text.to_owned()));
790        }
791        _ => {
792            let nested = slot_object
793                .entry(field_name)
794                .or_insert_with(|| Value::Object(Map::new()));
795            let nested_object = ensure_object(nested);
796            nested_object.insert("text".into(), Value::String(text.to_owned()));
797        }
798    }
799}
800
801fn append_function_call_arguments(response: &mut RawResponseSnapshot, event: &Value, delta: &str) {
802    let output_index = event
803        .get("output_index")
804        .and_then(Value::as_u64)
805        .map(|value| value as usize)
806        .unwrap_or_default();
807    ensure_vec_len(&mut response.output, output_index + 1);
808    if response.output[output_index].is_null() {
809        response.output[output_index] = Value::Object(Map::new());
810    }
811    let output = &mut response.output[output_index];
812    let object = ensure_object(output);
813    object
814        .entry("type")
815        .or_insert_with(|| Value::String("function_call".into()));
816    let arguments = object
817        .entry("arguments")
818        .or_insert_with(|| Value::String(String::new()));
819    if let Some(existing) = arguments.as_str() {
820        *arguments = Value::String(format!("{existing}{delta}"));
821    } else {
822        *arguments = Value::String(delta.to_owned());
823    }
824}
825
826#[cfg(test)]
827mod tests {
828    use super::ResponseAccumulator;
829    use serde_json::json;
830
831    #[test]
832    fn test_should_keep_response_snapshot_consistent_for_out_of_order_events() {
833        let mut accumulator = ResponseAccumulator::default();
834        for event in [
835            json!({
836                "type": "response.created",
837                "response": {
838                    "id": "resp_1",
839                    "object": "response",
840                    "status": "in_progress",
841                    "output": []
842                }
843            }),
844            json!({
845                "type": "response.output_text.delta",
846                "output_index": 0,
847                "content_index": 0,
848                "delta": "hel"
849            }),
850            json!({
851                "type": "response.output_item.added",
852                "output_index": 0,
853                "item": {
854                    "id": "msg_1",
855                    "type": "message",
856                    "role": "assistant",
857                    "content": []
858                }
859            }),
860            json!({
861                "type": "response.content_part.added",
862                "output_index": 0,
863                "content_index": 0,
864                "part": {
865                    "type": "output_text",
866                    "text": ""
867                }
868            }),
869            json!({
870                "type": "response.output_text.delta",
871                "output_index": 0,
872                "content_index": 0,
873                "delta": "lo"
874            }),
875            json!({
876                "type": "response.function_call_arguments.delta",
877                "output_index": 1,
878                "item_id": "fc_1",
879                "delta": "{\"city\":\"Sha"
880            }),
881            json!({
882                "type": "response.output_item.added",
883                "output_index": 1,
884                "item": {
885                    "id": "fc_1",
886                    "type": "function_call",
887                    "arguments": ""
888                }
889            }),
890            json!({
891                "type": "response.function_call_arguments.delta",
892                "output_index": 1,
893                "item_id": "fc_1",
894                "delta": "nghai\"}"
895            }),
896        ] {
897            accumulator.apply(&event);
898        }
899
900        let response = accumulator.response.clone().unwrap();
901        assert_eq!(accumulator.output_text, "hello");
902        assert_eq!(response.output_text().as_deref(), Some("hello"));
903        assert_eq!(
904            response.clone_public_response().unwrap().output[1]
905                .as_function_call()
906                .map(|call| call.arguments.as_str()),
907            Some("{\"city\":\"Shanghai\"}"),
908        );
909    }
910}