use crate::client::OpenAI;
use crate::error::OpenAIError;
use crate::streaming::SseStream;
use crate::types::responses::{Response, ResponseCreateRequest, ResponseStreamEvent};
#[derive(Debug, Clone, Default)]
pub struct StreamFcMeta {
pub response_id: Option<String>,
pub error: Option<String>,
}
pub struct StreamFcHandle {
rx: tokio::sync::mpsc::Receiver<crate::types::responses::FunctionCall>,
meta: tokio::sync::watch::Receiver<StreamFcMeta>,
}
impl StreamFcHandle {
pub async fn recv(&mut self) -> Option<crate::types::responses::FunctionCall> {
self.rx.recv().await
}
pub fn response_id(&self) -> Option<String> {
self.meta.borrow().response_id.clone()
}
pub async fn error(&mut self) -> Option<String> {
let _ = self.meta.changed().await;
self.meta.borrow().error.clone()
}
pub fn error_now(&self) -> Option<String> {
self.meta.borrow().error.clone()
}
}
pub struct Responses<'a> {
client: &'a OpenAI,
}
impl<'a> Responses<'a> {
pub(crate) fn new(client: &'a OpenAI) -> Self {
Self { client }
}
pub async fn create_raw(
&self,
request: &impl serde::Serialize,
) -> Result<serde_json::Value, OpenAIError> {
self.client.post_json("/responses", request).await
}
pub async fn create_stream_raw(
&self,
request: &impl serde::Serialize,
) -> Result<crate::streaming::SseStream<serde_json::Value>, OpenAIError> {
let builder = self
.client
.request(reqwest::Method::POST, "/responses")
.header(reqwest::header::ACCEPT, "text/event-stream")
.header(reqwest::header::CACHE_CONTROL, "no-cache")
.json(request);
let response = self.client.send_raw_with_retry(builder).await?;
let response = OpenAI::check_stream_response(response).await?;
Ok(crate::streaming::SseStream::new(response))
}
pub async fn create(&self, request: ResponseCreateRequest) -> Result<Response, OpenAIError> {
self.client.post("/responses", &request).await
}
#[cfg(feature = "structured")]
pub async fn parse<T: serde::de::DeserializeOwned + schemars::JsonSchema>(
&self,
mut request: ResponseCreateRequest,
) -> Result<crate::parsing::ParsedResponse<T>, OpenAIError> {
request.text = Some(crate::types::responses::ResponseTextConfig {
format: Some(crate::parsing::text_format_from_type::<T>()),
verbosity: None,
});
let response: Response = self.client.post("/responses", &request).await?;
crate::parsing::parse_response(response)
}
pub async fn create_stream(
&self,
mut request: ResponseCreateRequest,
) -> Result<SseStream<ResponseStreamEvent>, OpenAIError> {
request.stream = Some(true);
let builder = self
.client
.request(reqwest::Method::POST, "/responses")
.header(reqwest::header::ACCEPT, "text/event-stream")
.header(reqwest::header::CACHE_CONTROL, "no-cache")
.json(&request);
let response = self.client.send_raw_with_retry(builder).await?;
let response = OpenAI::check_stream_response(response).await?;
Ok(SseStream::new(response))
}
pub async fn create_stream_fc(
&self,
request: ResponseCreateRequest,
) -> Result<StreamFcHandle, OpenAIError> {
use futures_util::StreamExt;
let mut stream = self.create_stream(request).await?;
let (fc_tx, fc_rx) = tokio::sync::mpsc::channel(16);
let (meta_tx, meta_rx) = tokio::sync::watch::channel(StreamFcMeta::default());
let spawn_future = async move {
let mut pending_name: std::collections::HashMap<i64, String> = Default::default();
let mut pending_call_id: std::collections::HashMap<i64, String> = Default::default();
let mut response_id: Option<String> = None;
let mut event_count: u32 = 0;
const MAX_EVENTS: u32 = 10_000;
loop {
event_count += 1;
if event_count > MAX_EVENTS {
let _ = meta_tx.send(StreamFcMeta {
response_id: response_id.clone(),
error: Some("exceeded 10000 events limit".into()),
});
break;
}
let event =
crate::runtime::timeout(std::time::Duration::from_secs(60), stream.next())
.await;
let event = match event {
Ok(Some(Ok(ev))) => ev,
Ok(Some(Err(e))) => {
let _ = meta_tx.send(StreamFcMeta {
response_id: response_id.clone(),
error: Some(format!("stream error: {e}")),
});
break;
}
Ok(None) => break,
Err(_) => {
let _ = meta_tx.send(StreamFcMeta {
response_id: response_id.clone(),
error: Some("timeout: no event for 60s".into()),
});
break;
}
};
match event {
ResponseStreamEvent::ResponseCreated { response: resp } => {
response_id = Some(resp.id.clone());
let _ = meta_tx.send(StreamFcMeta {
response_id: response_id.clone(),
error: None,
});
}
ResponseStreamEvent::ResponseOutputItemAdded(evt) => {
if let crate::types::responses::OutputItem::FunctionCall(fc) = &evt.item {
pending_name.insert(evt.output_index, fc.name.clone());
let cid = fc.id.as_deref().unwrap_or(&fc.call_id);
pending_call_id.insert(evt.output_index, cid.to_string());
}
}
ResponseStreamEvent::ResponseFunctionCallArgumentsDone(evt) => {
let idx = i64::from(evt.output_index);
let name = pending_name.remove(&idx).unwrap_or_default();
let call_id = pending_call_id.remove(&idx).unwrap_or_default();
let parsed_args = serde_json::from_str(&evt.arguments)
.unwrap_or(serde_json::Value::Object(Default::default()));
let fc = crate::types::responses::FunctionCall {
call_id,
name,
arguments: parsed_args,
};
if fc_tx.send(fc).await.is_err() {
break; }
}
ResponseStreamEvent::ResponseFailed(evt) => {
let msg = evt
.response
.error
.as_ref()
.map(|e| e.message.clone())
.unwrap_or_else(|| "response.failed".into());
let _ = meta_tx.send(StreamFcMeta {
response_id: response_id.clone(),
error: Some(msg),
});
break;
}
ResponseStreamEvent::ResponseCompleted(_) => {
let _ = meta_tx.send(StreamFcMeta {
response_id: response_id.clone(),
error: None,
});
break;
}
_ => {}
}
}
};
crate::runtime::spawn(spawn_future);
Ok(StreamFcHandle {
rx: fc_rx,
meta: meta_rx,
})
}
pub async fn retrieve(&self, response_id: &str) -> Result<Response, OpenAIError> {
self.client.get(&format!("/responses/{response_id}")).await
}
pub async fn cancel(&self, response_id: &str) -> Result<Response, OpenAIError> {
self.client
.post_empty(&format!("/responses/{response_id}/cancel"))
.await
}
pub async fn input_items(&self, response_id: &str) -> Result<serde_json::Value, OpenAIError> {
self.client
.get(&format!("/responses/{response_id}/input_items"))
.await
}
pub async fn count_tokens(
&self,
request: &ResponseCreateRequest,
) -> Result<serde_json::Value, OpenAIError> {
self.client.post("/responses/input_tokens", request).await
}
pub async fn compact(
&self,
body: &impl serde::Serialize,
) -> Result<serde_json::Value, OpenAIError> {
self.client.post("/responses/compact", body).await
}
pub async fn delete(&self, response_id: &str) -> Result<(), OpenAIError> {
let response = self
.client
.request(
reqwest::Method::DELETE,
&format!("/responses/{response_id}"),
)
.send()
.await?;
OpenAI::check_stream_response(response).await?;
Ok(())
}
}
#[cfg(test)]
mod tests {
use crate::OpenAI;
use crate::config::ClientConfig;
use crate::types::responses::ResponseCreateRequest;
const RESPONSE_JSON: &str = r#"{
"id": "resp-abc123",
"object": "response",
"created_at": 1677610602.0,
"model": "gpt-4o",
"output": [{
"type": "message",
"id": "msg-abc123",
"role": "assistant",
"status": "completed",
"content": [{
"type": "output_text",
"text": "Hello!",
"annotations": []
}]
}],
"status": "completed",
"usage": {
"input_tokens": 10,
"output_tokens": 2,
"total_tokens": 12
}
}"#;
#[tokio::test]
async fn test_responses_create() {
let mut server = mockito::Server::new_async().await;
let mock = server
.mock("POST", "/responses")
.match_header("authorization", "Bearer sk-test")
.with_status(200)
.with_header("content-type", "application/json")
.with_body(RESPONSE_JSON)
.create_async()
.await;
let client = OpenAI::with_config(ClientConfig::new("sk-test").base_url(server.url()));
let mut request = ResponseCreateRequest::new("gpt-4o");
request.input = Some("Hello".into());
let response = client.responses().create(request).await.unwrap();
assert_eq!(response.id, "resp-abc123");
assert_eq!(response.output_text(), "Hello!");
mock.assert_async().await;
}
#[tokio::test]
async fn test_responses_create_raw() {
let mut server = mockito::Server::new_async().await;
let mock = server
.mock("POST", "/responses")
.match_header("authorization", "Bearer sk-test")
.match_body(mockito::Matcher::Json(serde_json::json!({
"model": "gpt-4o",
"input": "Hello",
"custom_field": "extra"
})))
.with_status(200)
.with_header("content-type", "application/json")
.with_body(r#"{"id":"resp-raw","object":"response","custom_resp":99}"#)
.create_async()
.await;
let client = OpenAI::with_config(ClientConfig::new("sk-test").base_url(server.url()));
let raw = client
.responses()
.create_raw(&serde_json::json!({
"model": "gpt-4o",
"input": "Hello",
"custom_field": "extra"
}))
.await
.unwrap();
assert_eq!(raw["id"], "resp-raw");
assert_eq!(raw["custom_resp"], 99);
mock.assert_async().await;
}
#[tokio::test]
async fn test_responses_retrieve() {
let mut server = mockito::Server::new_async().await;
let mock = server
.mock("GET", "/responses/resp-abc123")
.with_status(200)
.with_header("content-type", "application/json")
.with_body(RESPONSE_JSON)
.create_async()
.await;
let client = OpenAI::with_config(ClientConfig::new("sk-test").base_url(server.url()));
let response = client.responses().retrieve("resp-abc123").await.unwrap();
assert_eq!(response.id, "resp-abc123");
mock.assert_async().await;
}
#[tokio::test]
async fn test_responses_create_with_tools() {
use crate::types::responses::{Reasoning, ResponseTool};
let mut server = mockito::Server::new_async().await;
let mock = server
.mock("POST", "/responses")
.match_header("authorization", "Bearer sk-test")
.with_status(200)
.with_header("content-type", "application/json")
.with_body(RESPONSE_JSON)
.create_async()
.await;
let client = OpenAI::with_config(ClientConfig::new("sk-test").base_url(server.url()));
let mut request = ResponseCreateRequest::new("gpt-4o");
request.input = Some("Search for Rust".into());
request.tools = Some(vec![ResponseTool::WebSearch {
search_context_size: Some("medium".into()),
user_location: None,
}]);
request.reasoning = Some(Reasoning {
effort: Some(crate::types::common::ReasoningEffort::High),
summary: None,
});
request.truncation = Some("auto".into());
let response = client.responses().create(request).await.unwrap();
assert_eq!(response.id, "resp-abc123");
mock.assert_async().await;
}
#[tokio::test]
async fn test_responses_delete() {
let mut server = mockito::Server::new_async().await;
let mock = server
.mock("DELETE", "/responses/resp-abc123")
.with_status(200)
.with_body("")
.create_async()
.await;
let client = OpenAI::with_config(ClientConfig::new("sk-test").base_url(server.url()));
client.responses().delete("resp-abc123").await.unwrap();
mock.assert_async().await;
}
}