1use std::collections::{BTreeMap, HashMap};
2use std::pin::Pin;
3use std::task::{Context, Poll};
4
5use futures_util::Stream;
6use serde::{Deserialize, Serialize};
7use serde_json::{Map, Value};
8
9use super::partial_json::parse_optional_json;
10use super::sse::SseStream;
11use super::value_helpers::{ensure_array_field, ensure_object, ensure_vec_len};
12use crate::error::Result;
13use crate::json_payload::JsonPayload;
14use crate::resources::Response;
15use crate::response_meta::ResponseMeta;
16
17#[derive(Debug)]
19pub struct ResponseStream {
20 inner: SseStream<Value>,
21 accumulator: ResponseAccumulator,
22}
23
24impl ResponseStream {
25 pub fn new(inner: SseStream<Value>) -> Self {
27 Self {
28 inner,
29 accumulator: ResponseAccumulator::default(),
30 }
31 }
32
33 pub fn output_text(&self) -> &str {
35 &self.accumulator.output_text
36 }
37
38 pub fn function_arguments(&self) -> &HashMap<String, String> {
40 &self.accumulator.function_arguments
41 }
42
43 pub fn snapshot(&self) -> Option<Response> {
45 self.accumulator.snapshot()
46 }
47
48 pub async fn into_output_text(mut self) -> Result<String> {
50 while let Some(event) = futures_util::StreamExt::next(&mut self).await {
51 event?;
52 }
53 Ok(self.accumulator.output_text)
54 }
55
56 pub async fn final_response(mut self) -> Result<Option<Response>> {
58 while let Some(event) = futures_util::StreamExt::next(&mut self).await {
59 event?;
60 }
61 Ok(self.accumulator.into_response())
62 }
63
64 pub fn meta(&self) -> &ResponseMeta {
66 self.inner.meta()
67 }
68
69 pub fn events(self) -> ResponseEventStream {
71 ResponseEventStream::new(self)
72 }
73}
74
75impl Stream for ResponseStream {
76 type Item = Result<Value>;
77
78 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
79 let this = self.get_mut();
80 match Pin::new(&mut this.inner).poll_next(cx) {
81 Poll::Ready(Some(Ok(event))) => {
82 this.accumulator.apply(&event);
83 Poll::Ready(Some(Ok(event)))
84 }
85 other => other,
86 }
87 }
88}
89
90#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
92pub struct ResponseOutputTextEvent {
93 pub event_type: String,
95 pub output_index: usize,
97 pub content_index: usize,
99 pub text: String,
101 pub snapshot: String,
103}
104
105#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
107pub struct ResponseFunctionCallArgumentsEvent {
108 pub output_index: usize,
110 pub item_id: Option<String>,
112 pub delta: String,
114 pub snapshot: String,
116 pub parsed_arguments: Option<JsonPayload>,
118}
119
120#[derive(Debug, Clone, Serialize, Deserialize)]
122pub enum ResponseRuntimeEvent {
123 Raw(JsonPayload),
125 ResponseCreated(Response),
127 OutputItemAdded {
129 output_index: usize,
131 item: JsonPayload,
133 snapshot: Response,
135 },
136 ContentPartAdded {
138 output_index: usize,
140 content_index: usize,
142 part: JsonPayload,
144 snapshot: Response,
146 },
147 OutputTextDelta(ResponseOutputTextEvent),
149 OutputTextDone(ResponseOutputTextEvent),
151 FunctionCallArgumentsDelta(ResponseFunctionCallArgumentsEvent),
153 Completed(Response),
155}
156
157#[derive(Debug)]
159pub struct ResponseEventStream {
160 inner: ResponseStream,
161}
162
163impl ResponseEventStream {
164 fn new(inner: ResponseStream) -> Self {
165 Self { inner }
166 }
167
168 pub fn output_text(&self) -> &str {
170 self.inner.output_text()
171 }
172
173 pub fn function_arguments(&self) -> &HashMap<String, String> {
175 self.inner.function_arguments()
176 }
177
178 pub fn snapshot(&self) -> Option<Response> {
180 self.inner.snapshot()
181 }
182
183 pub fn meta(&self) -> &ResponseMeta {
185 self.inner.meta()
186 }
187
188 pub async fn final_response(mut self) -> Result<Option<Response>> {
190 while let Some(event) = futures_util::StreamExt::next(&mut self).await {
191 event?;
192 }
193 Ok(self.snapshot())
194 }
195}
196
197impl Stream for ResponseEventStream {
198 type Item = Result<ResponseRuntimeEvent>;
199
200 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
201 let this = self.get_mut();
202 match Pin::new(&mut this.inner).poll_next(cx) {
203 Poll::Ready(Some(Ok(event))) => {
204 let snapshot = this.inner.snapshot();
205 let output_text = this.inner.output_text().to_owned();
206 let function_arguments = this.inner.function_arguments().clone();
207 Poll::Ready(Some(Ok(derive_response_runtime_event(
208 event,
209 snapshot,
210 &output_text,
211 &function_arguments,
212 ))))
213 }
214 Poll::Ready(Some(Err(error))) => Poll::Ready(Some(Err(error))),
215 Poll::Ready(None) => Poll::Ready(None),
216 Poll::Pending => Poll::Pending,
217 }
218 }
219}
220
221fn derive_response_runtime_event(
222 event: Value,
223 snapshot: Option<Response>,
224 output_text: &str,
225 function_arguments: &HashMap<String, String>,
226) -> ResponseRuntimeEvent {
227 let event_type = event
228 .get("type")
229 .and_then(Value::as_str)
230 .unwrap_or_default()
231 .to_owned();
232
233 match event_type.as_str() {
234 "response.created" => snapshot
235 .map(ResponseRuntimeEvent::ResponseCreated)
236 .unwrap_or(ResponseRuntimeEvent::Raw(event.into())),
237 "response.output_item.added" => {
238 if let (Some(output_index), Some(item), Some(snapshot)) = (
239 event
240 .get("output_index")
241 .and_then(Value::as_u64)
242 .map(|value| value as usize),
243 event.get("item").cloned(),
244 snapshot,
245 ) {
246 ResponseRuntimeEvent::OutputItemAdded {
247 output_index,
248 item: item.into(),
249 snapshot,
250 }
251 } else {
252 ResponseRuntimeEvent::Raw(event.into())
253 }
254 }
255 "response.content_part.added" => {
256 if let (Some(output_index), Some(content_index), Some(part), Some(snapshot)) = (
257 event
258 .get("output_index")
259 .and_then(Value::as_u64)
260 .map(|value| value as usize),
261 event
262 .get("content_index")
263 .and_then(Value::as_u64)
264 .map(|value| value as usize),
265 event.get("part").cloned(),
266 snapshot,
267 ) {
268 ResponseRuntimeEvent::ContentPartAdded {
269 output_index,
270 content_index,
271 part: part.into(),
272 snapshot,
273 }
274 } else {
275 ResponseRuntimeEvent::Raw(event.into())
276 }
277 }
278 "response.output_text.delta" | "response.output_text.done" => {
279 let output_index = event
280 .get("output_index")
281 .and_then(Value::as_u64)
282 .map(|value| value as usize)
283 .unwrap_or_default();
284 let content_index = event
285 .get("content_index")
286 .and_then(Value::as_u64)
287 .map(|value| value as usize)
288 .unwrap_or_default();
289 let text = event
290 .get("delta")
291 .or_else(|| event.get("text"))
292 .and_then(Value::as_str)
293 .unwrap_or_default()
294 .to_owned();
295 let snapshot_text = snapshot
296 .as_ref()
297 .and_then(|response| {
298 response_output_text_snapshot(response, output_index, content_index)
299 })
300 .filter(|snapshot_text| !snapshot_text.is_empty())
301 .unwrap_or_else(|| {
302 if output_text.is_empty() {
303 text.clone()
304 } else {
305 output_text.to_owned()
306 }
307 });
308 let typed_event = ResponseOutputTextEvent {
309 event_type: event_type.clone(),
310 output_index,
311 content_index,
312 text,
313 snapshot: snapshot_text,
314 };
315 if event_type == "response.output_text.delta" {
316 ResponseRuntimeEvent::OutputTextDelta(typed_event)
317 } else {
318 ResponseRuntimeEvent::OutputTextDone(typed_event)
319 }
320 }
321 "response.function_call_arguments.delta" => {
322 let output_index = event
323 .get("output_index")
324 .and_then(Value::as_u64)
325 .map(|value| value as usize)
326 .unwrap_or_default();
327 let item_id = event
328 .get("item_id")
329 .or_else(|| event.get("call_id"))
330 .and_then(Value::as_str)
331 .map(str::to_owned);
332 let delta = event
333 .get("delta")
334 .and_then(Value::as_str)
335 .unwrap_or_default()
336 .to_owned();
337 let fallback_arguments = item_id
338 .as_deref()
339 .and_then(|key| function_arguments.get(key))
340 .cloned()
341 .or_else(|| function_arguments.get("default").cloned())
342 .unwrap_or_else(|| delta.clone());
343 let snapshot_arguments = snapshot
344 .as_ref()
345 .and_then(|response| response_function_arguments_snapshot(response, output_index))
346 .filter(|snapshot_arguments| !snapshot_arguments.is_empty())
347 .unwrap_or(fallback_arguments);
348 ResponseRuntimeEvent::FunctionCallArgumentsDelta(ResponseFunctionCallArgumentsEvent {
349 output_index,
350 parsed_arguments: parse_optional_json(&snapshot_arguments).map(JsonPayload::from),
351 item_id,
352 delta,
353 snapshot: snapshot_arguments,
354 })
355 }
356 "response.completed" => snapshot
357 .map(ResponseRuntimeEvent::Completed)
358 .unwrap_or(ResponseRuntimeEvent::Raw(event.into())),
359 _ => ResponseRuntimeEvent::Raw(event.into()),
360 }
361}
362
363fn response_output_text_snapshot(
364 response: &Response,
365 output_index: usize,
366 content_index: usize,
367) -> Option<String> {
368 let output = response.output.get(output_index)?;
369 if let Some(message) = output.as_message() {
370 return message
371 .content
372 .get(content_index)
373 .and_then(|item| item.text())
374 .map(str::to_owned);
375 }
376 if content_index == 0
377 && let Some(text) = output.output_text()
378 {
379 return Some(text.to_owned());
380 }
381 output
382 .as_raw()
383 .and_then(|value| value.get("content"))
384 .and_then(Value::as_array)
385 .and_then(|content| content.get(content_index))
386 .and_then(|item| item.get("text"))
387 .and_then(Value::as_str)
388 .map(str::to_owned)
389}
390
391fn response_function_arguments_snapshot(
392 response: &Response,
393 output_index: usize,
394) -> Option<String> {
395 response
396 .output
397 .get(output_index)
398 .and_then(|output| {
399 output
400 .as_function_call()
401 .map(|call| call.arguments.as_str())
402 .or_else(|| {
403 output
404 .as_raw()
405 .and_then(|value| value.get("arguments"))
406 .and_then(Value::as_str)
407 })
408 })
409 .map(str::to_owned)
410}
411
412#[derive(Debug, Default, Clone)]
413struct ResponseAccumulator {
414 response: Option<RawResponseSnapshot>,
415 output_text: String,
416 function_arguments: HashMap<String, String>,
417}
418
419impl ResponseAccumulator {
420 fn snapshot(&self) -> Option<Response> {
421 self.response
422 .as_ref()
423 .and_then(RawResponseSnapshot::clone_public_response)
424 }
425
426 fn into_response(self) -> Option<Response> {
427 self.response
428 .and_then(RawResponseSnapshot::into_public_response)
429 }
430
431 fn apply(&mut self, event: &Value) {
432 let Some(event_type) = event.get("type").and_then(Value::as_str) else {
433 return;
434 };
435
436 match event_type {
437 "response.created" => {
438 if let Some(response) = event.get("response") {
439 self.response = serde_json::from_value(response.clone()).ok();
440 self.sync_output_text_from_snapshot();
441 }
442 }
443 "response.output_item.added" => {
444 let Some(response) = &mut self.response else {
445 return;
446 };
447 let Some(item) = event.get("item") else {
448 return;
449 };
450 let index = event
451 .get("output_index")
452 .and_then(Value::as_u64)
453 .map(|value| value as usize)
454 .unwrap_or(response.output.len());
455 ensure_vec_len(&mut response.output, index + 1);
456 let existing = response.output[index].clone();
457 response.output[index] = merge_response_output_item(existing, item.clone());
458 self.sync_output_text_from_snapshot();
459 }
460 "response.content_part.added" => {
461 let Some(response) = &mut self.response else {
462 return;
463 };
464 let Some(part) = event.get("part") else {
465 return;
466 };
467 let output_index = event
468 .get("output_index")
469 .and_then(Value::as_u64)
470 .map(|value| value as usize)
471 .unwrap_or_default();
472 let content_index = event
473 .get("content_index")
474 .and_then(Value::as_u64)
475 .map(|value| value as usize)
476 .unwrap_or_default();
477 ensure_vec_len(&mut response.output, output_index + 1);
478 if response.output[output_index].is_null() {
479 response.output[output_index] = Value::Object(Map::new());
480 }
481 let output = &mut response.output[output_index];
482 let content = ensure_array_field(output, "content");
483 ensure_vec_len(content, content_index + 1);
484 let existing = content[content_index].clone();
485 content[content_index] = merge_response_content_part(existing, part.clone());
486 self.sync_output_text_from_snapshot();
487 }
488 "response.output_text.delta" => {
489 if let Some(delta) = event.get("delta").and_then(Value::as_str) {
490 self.output_text.push_str(delta);
491 }
492 if let Some(response) = &mut self.response {
493 append_response_content_text(response, event, "text", "output_text");
494 }
495 }
496 "response.output_text.done" => {
497 if self.output_text.is_empty()
498 && let Some(text) = event.get("text").and_then(Value::as_str)
499 {
500 self.output_text = text.to_owned();
501 }
502 if let Some(response) = &mut self.response {
503 set_response_content_text(response, event, "text", "output_text");
504 }
505 }
506 "response.function_call_arguments.delta" => {
507 let key = event
508 .get("item_id")
509 .and_then(Value::as_str)
510 .or_else(|| event.get("call_id").and_then(Value::as_str))
511 .unwrap_or("default");
512 let delta = event.get("delta").and_then(Value::as_str).unwrap_or("");
513 self.function_arguments
514 .entry(key.to_owned())
515 .and_modify(|value| value.push_str(delta))
516 .or_insert_with(|| delta.to_owned());
517 if let Some(response) = &mut self.response {
518 append_function_call_arguments(response, event, delta);
519 }
520 }
521 "response.reasoning_text.delta" => {
522 if let Some(response) = &mut self.response {
523 append_response_content_text(response, event, "text", "reasoning_text");
524 self.sync_output_text_from_snapshot();
525 }
526 }
527 "response.completed" => {
528 if let Some(response) = event.get("response") {
529 self.response = serde_json::from_value(response.clone()).ok();
530 self.sync_output_text_from_snapshot();
531 }
532 }
533 _ => {}
534 }
535 }
536
537 fn sync_output_text_from_snapshot(&mut self) {
538 if let Some(response) = &self.response
539 && let Some(text) = response.output_text()
540 {
541 self.output_text = text;
542 }
543 }
544}
545
546#[derive(Debug, Default, Clone, Serialize, Deserialize)]
547struct RawResponseSnapshot {
548 pub id: String,
549 pub created_at: Option<u64>,
550 #[serde(default)]
551 pub object: String,
552 pub model: Option<String>,
553 pub status: Option<String>,
554 pub error: Option<Value>,
555 pub incomplete_details: Option<Value>,
556 pub metadata: Option<BTreeMap<String, String>>,
557 #[serde(default)]
558 pub output: Vec<Value>,
559 pub usage: Option<Value>,
560 #[serde(flatten)]
561 pub extra: BTreeMap<String, Value>,
562}
563
564impl RawResponseSnapshot {
565 fn clone_public_response(&self) -> Option<Response> {
566 serde_json::to_value(self)
567 .ok()
568 .and_then(|value| serde_json::from_value(value).ok())
569 }
570
571 fn into_public_response(self) -> Option<Response> {
572 serde_json::to_value(self)
573 .ok()
574 .and_then(|value| serde_json::from_value(value).ok())
575 }
576
577 fn output_text(&self) -> Option<String> {
578 for item in &self.output {
579 if let Some(text) = item.get("text").and_then(Value::as_str) {
580 return Some(text.to_owned());
581 }
582 if let Some(content) = item.get("content").and_then(Value::as_array) {
583 for content_item in content {
584 if let Some(text) = content_item.get("text").and_then(Value::as_str) {
585 return Some(text.to_owned());
586 }
587 }
588 }
589 }
590
591 self.extra
592 .get("output_text")
593 .and_then(Value::as_str)
594 .map(str::to_owned)
595 }
596}
597
598fn merge_response_output_item(existing: Value, incoming: Value) -> Value {
599 let (Some(existing_object), Some(mut incoming_object)) =
600 (existing.as_object(), incoming.as_object().cloned())
601 else {
602 return incoming;
603 };
604
605 if let Some(existing_arguments) = existing_object
606 .get("arguments")
607 .and_then(Value::as_str)
608 .filter(|value| !value.is_empty())
609 {
610 let incoming_arguments = incoming_object
611 .get("arguments")
612 .and_then(Value::as_str)
613 .unwrap_or("");
614 if incoming_arguments.is_empty() {
615 incoming_object.insert(
616 "arguments".into(),
617 Value::String(existing_arguments.to_owned()),
618 );
619 }
620 }
621
622 if let Some(existing_content) = existing_object
623 .get("content")
624 .and_then(Value::as_array)
625 .filter(|value| !value.is_empty())
626 .cloned()
627 {
628 let use_existing_content = incoming_object
629 .get("content")
630 .and_then(Value::as_array)
631 .is_none_or(Vec::is_empty);
632 if use_existing_content {
633 incoming_object.insert("content".into(), Value::Array(existing_content));
634 }
635 }
636
637 Value::Object(incoming_object)
638}
639
640fn merge_response_content_part(existing: Value, incoming: Value) -> Value {
641 let (Some(existing_object), Some(mut incoming_object)) =
642 (existing.as_object(), incoming.as_object().cloned())
643 else {
644 return incoming;
645 };
646
647 if let Some(existing_text) = existing_object
648 .get("text")
649 .and_then(Value::as_str)
650 .filter(|value| !value.is_empty())
651 {
652 let incoming_text = incoming_object
653 .get("text")
654 .and_then(Value::as_str)
655 .unwrap_or("");
656 if incoming_text.is_empty() {
657 incoming_object.insert("text".into(), Value::String(existing_text.to_owned()));
658 }
659 }
660
661 for key in ["output_text", "reasoning_text"] {
662 let Some(existing_text) = existing_object
663 .get(key)
664 .and_then(|value| value.get("text"))
665 .and_then(Value::as_str)
666 .filter(|value| !value.is_empty())
667 else {
668 continue;
669 };
670 let incoming_value = incoming_object
671 .entry(key.to_owned())
672 .or_insert_with(|| Value::Object(Map::new()));
673 let incoming_nested = ensure_object(incoming_value);
674 let incoming_text = incoming_nested
675 .get("text")
676 .and_then(Value::as_str)
677 .unwrap_or("");
678 if incoming_text.is_empty() {
679 incoming_nested.insert("text".into(), Value::String(existing_text.to_owned()));
680 }
681 }
682
683 Value::Object(incoming_object)
684}
685
686fn append_response_content_text(
687 response: &mut RawResponseSnapshot,
688 event: &Value,
689 field_name: &str,
690 default_type: &str,
691) {
692 let output_index = event
693 .get("output_index")
694 .and_then(Value::as_u64)
695 .map(|value| value as usize)
696 .unwrap_or_default();
697 let content_index = event
698 .get("content_index")
699 .and_then(Value::as_u64)
700 .map(|value| value as usize)
701 .unwrap_or_default();
702 let delta = event.get("delta").and_then(Value::as_str).unwrap_or("");
703
704 ensure_vec_len(&mut response.output, output_index + 1);
705 if response.output[output_index].is_null() {
706 response.output[output_index] = Value::Object(Map::new());
707 }
708 let output = &mut response.output[output_index];
709 let content = ensure_array_field(output, "content");
710 ensure_vec_len(content, content_index + 1);
711 if content[content_index].is_null() {
712 let mut content_map = Map::new();
713 content_map.insert("type".into(), Value::String(default_type.to_owned()));
714 content_map.insert(field_name.into(), Value::String(String::new()));
715 content[content_index] = Value::Object(content_map);
716 }
717
718 let slot = &mut content[content_index];
719 let slot_object = ensure_object(slot);
720 slot_object
721 .entry("type")
722 .or_insert_with(|| Value::String(default_type.to_owned()));
723 match field_name {
724 "text" => {
725 let text = slot_object
726 .entry("text")
727 .or_insert_with(|| Value::String(String::new()));
728 if let Some(existing) = text.as_str() {
729 *text = Value::String(format!("{existing}{delta}"));
730 } else {
731 *text = Value::String(delta.to_owned());
732 }
733 }
734 _ => {
735 let nested = slot_object
736 .entry(field_name)
737 .or_insert_with(|| Value::Object(Map::new()));
738 let nested_object = ensure_object(nested);
739 let text = nested_object
740 .entry("text")
741 .or_insert_with(|| Value::String(String::new()));
742 if let Some(existing) = text.as_str() {
743 *text = Value::String(format!("{existing}{delta}"));
744 } else {
745 *text = Value::String(delta.to_owned());
746 }
747 }
748 }
749}
750
751fn set_response_content_text(
752 response: &mut RawResponseSnapshot,
753 event: &Value,
754 field_name: &str,
755 default_type: &str,
756) {
757 let Some(text) = event.get("text").and_then(Value::as_str) else {
758 return;
759 };
760 let output_index = event
761 .get("output_index")
762 .and_then(Value::as_u64)
763 .map(|value| value as usize)
764 .unwrap_or_default();
765 let content_index = event
766 .get("content_index")
767 .and_then(Value::as_u64)
768 .map(|value| value as usize)
769 .unwrap_or_default();
770
771 ensure_vec_len(&mut response.output, output_index + 1);
772 if response.output[output_index].is_null() {
773 response.output[output_index] = Value::Object(Map::new());
774 }
775 let output = &mut response.output[output_index];
776 let content = ensure_array_field(output, "content");
777 ensure_vec_len(content, content_index + 1);
778 if content[content_index].is_null() {
779 let mut content_map = Map::new();
780 content_map.insert("type".into(), Value::String(default_type.to_owned()));
781 content[content_index] = Value::Object(content_map);
782 }
783
784 let slot = &mut content[content_index];
785 let slot_object = ensure_object(slot);
786 slot_object.insert("type".into(), Value::String(default_type.to_owned()));
787 match field_name {
788 "text" => {
789 slot_object.insert("text".into(), Value::String(text.to_owned()));
790 }
791 _ => {
792 let nested = slot_object
793 .entry(field_name)
794 .or_insert_with(|| Value::Object(Map::new()));
795 let nested_object = ensure_object(nested);
796 nested_object.insert("text".into(), Value::String(text.to_owned()));
797 }
798 }
799}
800
801fn append_function_call_arguments(response: &mut RawResponseSnapshot, event: &Value, delta: &str) {
802 let output_index = event
803 .get("output_index")
804 .and_then(Value::as_u64)
805 .map(|value| value as usize)
806 .unwrap_or_default();
807 ensure_vec_len(&mut response.output, output_index + 1);
808 if response.output[output_index].is_null() {
809 response.output[output_index] = Value::Object(Map::new());
810 }
811 let output = &mut response.output[output_index];
812 let object = ensure_object(output);
813 object
814 .entry("type")
815 .or_insert_with(|| Value::String("function_call".into()));
816 let arguments = object
817 .entry("arguments")
818 .or_insert_with(|| Value::String(String::new()));
819 if let Some(existing) = arguments.as_str() {
820 *arguments = Value::String(format!("{existing}{delta}"));
821 } else {
822 *arguments = Value::String(delta.to_owned());
823 }
824}
825
826#[cfg(test)]
827mod tests {
828 use super::ResponseAccumulator;
829 use serde_json::json;
830
831 #[test]
832 fn test_should_keep_response_snapshot_consistent_for_out_of_order_events() {
833 let mut accumulator = ResponseAccumulator::default();
834 for event in [
835 json!({
836 "type": "response.created",
837 "response": {
838 "id": "resp_1",
839 "object": "response",
840 "status": "in_progress",
841 "output": []
842 }
843 }),
844 json!({
845 "type": "response.output_text.delta",
846 "output_index": 0,
847 "content_index": 0,
848 "delta": "hel"
849 }),
850 json!({
851 "type": "response.output_item.added",
852 "output_index": 0,
853 "item": {
854 "id": "msg_1",
855 "type": "message",
856 "role": "assistant",
857 "content": []
858 }
859 }),
860 json!({
861 "type": "response.content_part.added",
862 "output_index": 0,
863 "content_index": 0,
864 "part": {
865 "type": "output_text",
866 "text": ""
867 }
868 }),
869 json!({
870 "type": "response.output_text.delta",
871 "output_index": 0,
872 "content_index": 0,
873 "delta": "lo"
874 }),
875 json!({
876 "type": "response.function_call_arguments.delta",
877 "output_index": 1,
878 "item_id": "fc_1",
879 "delta": "{\"city\":\"Sha"
880 }),
881 json!({
882 "type": "response.output_item.added",
883 "output_index": 1,
884 "item": {
885 "id": "fc_1",
886 "type": "function_call",
887 "arguments": ""
888 }
889 }),
890 json!({
891 "type": "response.function_call_arguments.delta",
892 "output_index": 1,
893 "item_id": "fc_1",
894 "delta": "nghai\"}"
895 }),
896 ] {
897 accumulator.apply(&event);
898 }
899
900 let response = accumulator.response.clone().unwrap();
901 assert_eq!(accumulator.output_text, "hello");
902 assert_eq!(response.output_text().as_deref(), Some("hello"));
903 assert_eq!(
904 response.clone_public_response().unwrap().output[1]
905 .as_function_call()
906 .map(|call| call.arguments.as_str()),
907 Some("{\"city\":\"Shanghai\"}"),
908 );
909 }
910}