use std::collections::{BTreeMap, HashMap};
use std::pin::Pin;
use std::task::{Context, Poll};
use futures_util::Stream;
use serde::{Deserialize, Serialize};
use serde_json::{Map, Value};
use super::partial_json::parse_optional_json;
use super::sse::SseStream;
use super::value_helpers::{ensure_array_field, ensure_object, ensure_vec_len};
use crate::error::Result;
use crate::json_payload::JsonPayload;
use crate::resources::Response;
use crate::response_meta::ResponseMeta;
#[derive(Debug)]
pub struct ResponseStream {
inner: SseStream<Value>,
accumulator: ResponseAccumulator,
}
impl ResponseStream {
pub fn new(inner: SseStream<Value>) -> Self {
Self {
inner,
accumulator: ResponseAccumulator::default(),
}
}
pub fn output_text(&self) -> &str {
&self.accumulator.output_text
}
pub fn function_arguments(&self) -> &HashMap<String, String> {
&self.accumulator.function_arguments
}
pub fn snapshot(&self) -> Option<Response> {
self.accumulator.snapshot()
}
pub async fn into_output_text(mut self) -> Result<String> {
while let Some(event) = futures_util::StreamExt::next(&mut self).await {
event?;
}
Ok(self.accumulator.output_text)
}
pub async fn final_response(mut self) -> Result<Option<Response>> {
while let Some(event) = futures_util::StreamExt::next(&mut self).await {
event?;
}
Ok(self.accumulator.into_response())
}
pub fn meta(&self) -> &ResponseMeta {
self.inner.meta()
}
pub fn events(self) -> ResponseEventStream {
ResponseEventStream::new(self)
}
}
impl Stream for ResponseStream {
type Item = Result<Value>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.get_mut();
match Pin::new(&mut this.inner).poll_next(cx) {
Poll::Ready(Some(Ok(event))) => {
this.accumulator.apply(&event);
Poll::Ready(Some(Ok(event)))
}
other => other,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct ResponseOutputTextEvent {
pub event_type: String,
pub output_index: usize,
pub content_index: usize,
pub text: String,
pub snapshot: String,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct ResponseFunctionCallArgumentsEvent {
pub output_index: usize,
pub item_id: Option<String>,
pub delta: String,
pub snapshot: String,
pub parsed_arguments: Option<JsonPayload>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum ResponseRuntimeEvent {
Raw(JsonPayload),
ResponseCreated(Response),
OutputItemAdded {
output_index: usize,
item: JsonPayload,
snapshot: Response,
},
ContentPartAdded {
output_index: usize,
content_index: usize,
part: JsonPayload,
snapshot: Response,
},
OutputTextDelta(ResponseOutputTextEvent),
OutputTextDone(ResponseOutputTextEvent),
FunctionCallArgumentsDelta(ResponseFunctionCallArgumentsEvent),
Completed(Response),
}
#[derive(Debug)]
pub struct ResponseEventStream {
inner: ResponseStream,
}
impl ResponseEventStream {
fn new(inner: ResponseStream) -> Self {
Self { inner }
}
pub fn output_text(&self) -> &str {
self.inner.output_text()
}
pub fn function_arguments(&self) -> &HashMap<String, String> {
self.inner.function_arguments()
}
pub fn snapshot(&self) -> Option<Response> {
self.inner.snapshot()
}
pub fn meta(&self) -> &ResponseMeta {
self.inner.meta()
}
pub async fn final_response(mut self) -> Result<Option<Response>> {
while let Some(event) = futures_util::StreamExt::next(&mut self).await {
event?;
}
Ok(self.snapshot())
}
}
impl Stream for ResponseEventStream {
type Item = Result<ResponseRuntimeEvent>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.get_mut();
match Pin::new(&mut this.inner).poll_next(cx) {
Poll::Ready(Some(Ok(event))) => {
let snapshot = this.inner.snapshot();
let output_text = this.inner.output_text().to_owned();
let function_arguments = this.inner.function_arguments().clone();
Poll::Ready(Some(Ok(derive_response_runtime_event(
event,
snapshot,
&output_text,
&function_arguments,
))))
}
Poll::Ready(Some(Err(error))) => Poll::Ready(Some(Err(error))),
Poll::Ready(None) => Poll::Ready(None),
Poll::Pending => Poll::Pending,
}
}
}
fn derive_response_runtime_event(
event: Value,
snapshot: Option<Response>,
output_text: &str,
function_arguments: &HashMap<String, String>,
) -> ResponseRuntimeEvent {
let event_type = event
.get("type")
.and_then(Value::as_str)
.unwrap_or_default()
.to_owned();
match event_type.as_str() {
"response.created" => snapshot
.map(ResponseRuntimeEvent::ResponseCreated)
.unwrap_or(ResponseRuntimeEvent::Raw(event.into())),
"response.output_item.added" => {
if let (Some(output_index), Some(item), Some(snapshot)) = (
event
.get("output_index")
.and_then(Value::as_u64)
.map(|value| value as usize),
event.get("item").cloned(),
snapshot,
) {
ResponseRuntimeEvent::OutputItemAdded {
output_index,
item: item.into(),
snapshot,
}
} else {
ResponseRuntimeEvent::Raw(event.into())
}
}
"response.content_part.added" => {
if let (Some(output_index), Some(content_index), Some(part), Some(snapshot)) = (
event
.get("output_index")
.and_then(Value::as_u64)
.map(|value| value as usize),
event
.get("content_index")
.and_then(Value::as_u64)
.map(|value| value as usize),
event.get("part").cloned(),
snapshot,
) {
ResponseRuntimeEvent::ContentPartAdded {
output_index,
content_index,
part: part.into(),
snapshot,
}
} else {
ResponseRuntimeEvent::Raw(event.into())
}
}
"response.output_text.delta" | "response.output_text.done" => {
let output_index = event
.get("output_index")
.and_then(Value::as_u64)
.map(|value| value as usize)
.unwrap_or_default();
let content_index = event
.get("content_index")
.and_then(Value::as_u64)
.map(|value| value as usize)
.unwrap_or_default();
let text = event
.get("delta")
.or_else(|| event.get("text"))
.and_then(Value::as_str)
.unwrap_or_default()
.to_owned();
let snapshot_text = snapshot
.as_ref()
.and_then(|response| {
response_output_text_snapshot(response, output_index, content_index)
})
.filter(|snapshot_text| !snapshot_text.is_empty())
.unwrap_or_else(|| {
if output_text.is_empty() {
text.clone()
} else {
output_text.to_owned()
}
});
let typed_event = ResponseOutputTextEvent {
event_type: event_type.clone(),
output_index,
content_index,
text,
snapshot: snapshot_text,
};
if event_type == "response.output_text.delta" {
ResponseRuntimeEvent::OutputTextDelta(typed_event)
} else {
ResponseRuntimeEvent::OutputTextDone(typed_event)
}
}
"response.function_call_arguments.delta" => {
let output_index = event
.get("output_index")
.and_then(Value::as_u64)
.map(|value| value as usize)
.unwrap_or_default();
let item_id = event
.get("item_id")
.or_else(|| event.get("call_id"))
.and_then(Value::as_str)
.map(str::to_owned);
let delta = event
.get("delta")
.and_then(Value::as_str)
.unwrap_or_default()
.to_owned();
let fallback_arguments = item_id
.as_deref()
.and_then(|key| function_arguments.get(key))
.cloned()
.or_else(|| function_arguments.get("default").cloned())
.unwrap_or_else(|| delta.clone());
let snapshot_arguments = snapshot
.as_ref()
.and_then(|response| response_function_arguments_snapshot(response, output_index))
.filter(|snapshot_arguments| !snapshot_arguments.is_empty())
.unwrap_or(fallback_arguments);
ResponseRuntimeEvent::FunctionCallArgumentsDelta(ResponseFunctionCallArgumentsEvent {
output_index,
parsed_arguments: parse_optional_json(&snapshot_arguments).map(JsonPayload::from),
item_id,
delta,
snapshot: snapshot_arguments,
})
}
"response.completed" => snapshot
.map(ResponseRuntimeEvent::Completed)
.unwrap_or(ResponseRuntimeEvent::Raw(event.into())),
_ => ResponseRuntimeEvent::Raw(event.into()),
}
}
fn response_output_text_snapshot(
response: &Response,
output_index: usize,
content_index: usize,
) -> Option<String> {
let output = response.output.get(output_index)?;
if let Some(message) = output.as_message() {
return message
.content
.get(content_index)
.and_then(|item| item.text())
.map(str::to_owned);
}
if content_index == 0
&& let Some(text) = output.output_text()
{
return Some(text.to_owned());
}
output
.as_raw()
.and_then(|value| value.get("content"))
.and_then(Value::as_array)
.and_then(|content| content.get(content_index))
.and_then(|item| item.get("text"))
.and_then(Value::as_str)
.map(str::to_owned)
}
fn response_function_arguments_snapshot(
response: &Response,
output_index: usize,
) -> Option<String> {
response
.output
.get(output_index)
.and_then(|output| {
output
.as_function_call()
.map(|call| call.arguments.as_str())
.or_else(|| {
output
.as_raw()
.and_then(|value| value.get("arguments"))
.and_then(Value::as_str)
})
})
.map(str::to_owned)
}
#[derive(Debug, Default, Clone)]
struct ResponseAccumulator {
response: Option<RawResponseSnapshot>,
output_text: String,
function_arguments: HashMap<String, String>,
}
impl ResponseAccumulator {
fn snapshot(&self) -> Option<Response> {
self.response
.as_ref()
.and_then(RawResponseSnapshot::clone_public_response)
}
fn into_response(self) -> Option<Response> {
self.response
.and_then(RawResponseSnapshot::into_public_response)
}
fn apply(&mut self, event: &Value) {
let Some(event_type) = event.get("type").and_then(Value::as_str) else {
return;
};
match event_type {
"response.created" => {
if let Some(response) = event.get("response") {
self.response = serde_json::from_value(response.clone()).ok();
self.sync_output_text_from_snapshot();
}
}
"response.output_item.added" => {
let Some(response) = &mut self.response else {
return;
};
let Some(item) = event.get("item") else {
return;
};
let index = event
.get("output_index")
.and_then(Value::as_u64)
.map(|value| value as usize)
.unwrap_or(response.output.len());
ensure_vec_len(&mut response.output, index + 1);
let existing = response.output[index].clone();
response.output[index] = merge_response_output_item(existing, item.clone());
self.sync_output_text_from_snapshot();
}
"response.content_part.added" => {
let Some(response) = &mut self.response else {
return;
};
let Some(part) = event.get("part") else {
return;
};
let output_index = event
.get("output_index")
.and_then(Value::as_u64)
.map(|value| value as usize)
.unwrap_or_default();
let content_index = event
.get("content_index")
.and_then(Value::as_u64)
.map(|value| value as usize)
.unwrap_or_default();
ensure_vec_len(&mut response.output, output_index + 1);
if response.output[output_index].is_null() {
response.output[output_index] = Value::Object(Map::new());
}
let output = &mut response.output[output_index];
let content = ensure_array_field(output, "content");
ensure_vec_len(content, content_index + 1);
let existing = content[content_index].clone();
content[content_index] = merge_response_content_part(existing, part.clone());
self.sync_output_text_from_snapshot();
}
"response.output_text.delta" => {
if let Some(delta) = event.get("delta").and_then(Value::as_str) {
self.output_text.push_str(delta);
}
if let Some(response) = &mut self.response {
append_response_content_text(response, event, "text", "output_text");
}
}
"response.output_text.done" => {
if self.output_text.is_empty()
&& let Some(text) = event.get("text").and_then(Value::as_str)
{
self.output_text = text.to_owned();
}
if let Some(response) = &mut self.response {
set_response_content_text(response, event, "text", "output_text");
}
}
"response.function_call_arguments.delta" => {
let key = event
.get("item_id")
.and_then(Value::as_str)
.or_else(|| event.get("call_id").and_then(Value::as_str))
.unwrap_or("default");
let delta = event.get("delta").and_then(Value::as_str).unwrap_or("");
self.function_arguments
.entry(key.to_owned())
.and_modify(|value| value.push_str(delta))
.or_insert_with(|| delta.to_owned());
if let Some(response) = &mut self.response {
append_function_call_arguments(response, event, delta);
}
}
"response.reasoning_text.delta" => {
if let Some(response) = &mut self.response {
append_response_content_text(response, event, "text", "reasoning_text");
self.sync_output_text_from_snapshot();
}
}
"response.completed" => {
if let Some(response) = event.get("response") {
self.response = serde_json::from_value(response.clone()).ok();
self.sync_output_text_from_snapshot();
}
}
_ => {}
}
}
fn sync_output_text_from_snapshot(&mut self) {
if let Some(response) = &self.response
&& let Some(text) = response.output_text()
{
self.output_text = text;
}
}
}
#[derive(Debug, Default, Clone, Serialize, Deserialize)]
struct RawResponseSnapshot {
pub id: String,
pub created_at: Option<u64>,
#[serde(default)]
pub object: String,
pub model: Option<String>,
pub status: Option<String>,
pub error: Option<Value>,
pub incomplete_details: Option<Value>,
pub metadata: Option<BTreeMap<String, String>>,
#[serde(default)]
pub output: Vec<Value>,
pub usage: Option<Value>,
#[serde(flatten)]
pub extra: BTreeMap<String, Value>,
}
impl RawResponseSnapshot {
fn clone_public_response(&self) -> Option<Response> {
serde_json::to_value(self)
.ok()
.and_then(|value| serde_json::from_value(value).ok())
}
fn into_public_response(self) -> Option<Response> {
serde_json::to_value(self)
.ok()
.and_then(|value| serde_json::from_value(value).ok())
}
fn output_text(&self) -> Option<String> {
for item in &self.output {
if let Some(text) = item.get("text").and_then(Value::as_str) {
return Some(text.to_owned());
}
if let Some(content) = item.get("content").and_then(Value::as_array) {
for content_item in content {
if let Some(text) = content_item.get("text").and_then(Value::as_str) {
return Some(text.to_owned());
}
}
}
}
self.extra
.get("output_text")
.and_then(Value::as_str)
.map(str::to_owned)
}
}
fn merge_response_output_item(existing: Value, incoming: Value) -> Value {
let (Some(existing_object), Some(mut incoming_object)) =
(existing.as_object(), incoming.as_object().cloned())
else {
return incoming;
};
if let Some(existing_arguments) = existing_object
.get("arguments")
.and_then(Value::as_str)
.filter(|value| !value.is_empty())
{
let incoming_arguments = incoming_object
.get("arguments")
.and_then(Value::as_str)
.unwrap_or("");
if incoming_arguments.is_empty() {
incoming_object.insert(
"arguments".into(),
Value::String(existing_arguments.to_owned()),
);
}
}
if let Some(existing_content) = existing_object
.get("content")
.and_then(Value::as_array)
.filter(|value| !value.is_empty())
.cloned()
{
let use_existing_content = incoming_object
.get("content")
.and_then(Value::as_array)
.is_none_or(Vec::is_empty);
if use_existing_content {
incoming_object.insert("content".into(), Value::Array(existing_content));
}
}
Value::Object(incoming_object)
}
fn merge_response_content_part(existing: Value, incoming: Value) -> Value {
let (Some(existing_object), Some(mut incoming_object)) =
(existing.as_object(), incoming.as_object().cloned())
else {
return incoming;
};
if let Some(existing_text) = existing_object
.get("text")
.and_then(Value::as_str)
.filter(|value| !value.is_empty())
{
let incoming_text = incoming_object
.get("text")
.and_then(Value::as_str)
.unwrap_or("");
if incoming_text.is_empty() {
incoming_object.insert("text".into(), Value::String(existing_text.to_owned()));
}
}
for key in ["output_text", "reasoning_text"] {
let Some(existing_text) = existing_object
.get(key)
.and_then(|value| value.get("text"))
.and_then(Value::as_str)
.filter(|value| !value.is_empty())
else {
continue;
};
let incoming_value = incoming_object
.entry(key.to_owned())
.or_insert_with(|| Value::Object(Map::new()));
let incoming_nested = ensure_object(incoming_value);
let incoming_text = incoming_nested
.get("text")
.and_then(Value::as_str)
.unwrap_or("");
if incoming_text.is_empty() {
incoming_nested.insert("text".into(), Value::String(existing_text.to_owned()));
}
}
Value::Object(incoming_object)
}
fn append_response_content_text(
response: &mut RawResponseSnapshot,
event: &Value,
field_name: &str,
default_type: &str,
) {
let output_index = event
.get("output_index")
.and_then(Value::as_u64)
.map(|value| value as usize)
.unwrap_or_default();
let content_index = event
.get("content_index")
.and_then(Value::as_u64)
.map(|value| value as usize)
.unwrap_or_default();
let delta = event.get("delta").and_then(Value::as_str).unwrap_or("");
ensure_vec_len(&mut response.output, output_index + 1);
if response.output[output_index].is_null() {
response.output[output_index] = Value::Object(Map::new());
}
let output = &mut response.output[output_index];
let content = ensure_array_field(output, "content");
ensure_vec_len(content, content_index + 1);
if content[content_index].is_null() {
let mut content_map = Map::new();
content_map.insert("type".into(), Value::String(default_type.to_owned()));
content_map.insert(field_name.into(), Value::String(String::new()));
content[content_index] = Value::Object(content_map);
}
let slot = &mut content[content_index];
let slot_object = ensure_object(slot);
slot_object
.entry("type")
.or_insert_with(|| Value::String(default_type.to_owned()));
match field_name {
"text" => {
let text = slot_object
.entry("text")
.or_insert_with(|| Value::String(String::new()));
if let Some(existing) = text.as_str() {
*text = Value::String(format!("{existing}{delta}"));
} else {
*text = Value::String(delta.to_owned());
}
}
_ => {
let nested = slot_object
.entry(field_name)
.or_insert_with(|| Value::Object(Map::new()));
let nested_object = ensure_object(nested);
let text = nested_object
.entry("text")
.or_insert_with(|| Value::String(String::new()));
if let Some(existing) = text.as_str() {
*text = Value::String(format!("{existing}{delta}"));
} else {
*text = Value::String(delta.to_owned());
}
}
}
}
fn set_response_content_text(
response: &mut RawResponseSnapshot,
event: &Value,
field_name: &str,
default_type: &str,
) {
let Some(text) = event.get("text").and_then(Value::as_str) else {
return;
};
let output_index = event
.get("output_index")
.and_then(Value::as_u64)
.map(|value| value as usize)
.unwrap_or_default();
let content_index = event
.get("content_index")
.and_then(Value::as_u64)
.map(|value| value as usize)
.unwrap_or_default();
ensure_vec_len(&mut response.output, output_index + 1);
if response.output[output_index].is_null() {
response.output[output_index] = Value::Object(Map::new());
}
let output = &mut response.output[output_index];
let content = ensure_array_field(output, "content");
ensure_vec_len(content, content_index + 1);
if content[content_index].is_null() {
let mut content_map = Map::new();
content_map.insert("type".into(), Value::String(default_type.to_owned()));
content[content_index] = Value::Object(content_map);
}
let slot = &mut content[content_index];
let slot_object = ensure_object(slot);
slot_object.insert("type".into(), Value::String(default_type.to_owned()));
match field_name {
"text" => {
slot_object.insert("text".into(), Value::String(text.to_owned()));
}
_ => {
let nested = slot_object
.entry(field_name)
.or_insert_with(|| Value::Object(Map::new()));
let nested_object = ensure_object(nested);
nested_object.insert("text".into(), Value::String(text.to_owned()));
}
}
}
fn append_function_call_arguments(response: &mut RawResponseSnapshot, event: &Value, delta: &str) {
let output_index = event
.get("output_index")
.and_then(Value::as_u64)
.map(|value| value as usize)
.unwrap_or_default();
ensure_vec_len(&mut response.output, output_index + 1);
if response.output[output_index].is_null() {
response.output[output_index] = Value::Object(Map::new());
}
let output = &mut response.output[output_index];
let object = ensure_object(output);
object
.entry("type")
.or_insert_with(|| Value::String("function_call".into()));
let arguments = object
.entry("arguments")
.or_insert_with(|| Value::String(String::new()));
if let Some(existing) = arguments.as_str() {
*arguments = Value::String(format!("{existing}{delta}"));
} else {
*arguments = Value::String(delta.to_owned());
}
}
#[cfg(test)]
mod tests {
use super::ResponseAccumulator;
use serde_json::json;
#[test]
fn test_should_keep_response_snapshot_consistent_for_out_of_order_events() {
let mut accumulator = ResponseAccumulator::default();
for event in [
json!({
"type": "response.created",
"response": {
"id": "resp_1",
"object": "response",
"status": "in_progress",
"output": []
}
}),
json!({
"type": "response.output_text.delta",
"output_index": 0,
"content_index": 0,
"delta": "hel"
}),
json!({
"type": "response.output_item.added",
"output_index": 0,
"item": {
"id": "msg_1",
"type": "message",
"role": "assistant",
"content": []
}
}),
json!({
"type": "response.content_part.added",
"output_index": 0,
"content_index": 0,
"part": {
"type": "output_text",
"text": ""
}
}),
json!({
"type": "response.output_text.delta",
"output_index": 0,
"content_index": 0,
"delta": "lo"
}),
json!({
"type": "response.function_call_arguments.delta",
"output_index": 1,
"item_id": "fc_1",
"delta": "{\"city\":\"Sha"
}),
json!({
"type": "response.output_item.added",
"output_index": 1,
"item": {
"id": "fc_1",
"type": "function_call",
"arguments": ""
}
}),
json!({
"type": "response.function_call_arguments.delta",
"output_index": 1,
"item_id": "fc_1",
"delta": "nghai\"}"
}),
] {
accumulator.apply(&event);
}
let response = accumulator.response.clone().unwrap();
assert_eq!(accumulator.output_text, "hello");
assert_eq!(response.output_text().as_deref(), Some("hello"));
assert_eq!(
response.clone_public_response().unwrap().output[1]
.as_function_call()
.map(|call| call.arguments.as_str()),
Some("{\"city\":\"Shanghai\"}"),
);
}
}