1use crate::attachments::validate_request_attachments;
8use crate::provider::LlmProvider;
9use crate::streaming::{StreamBox, StreamDelta, StreamErrorKind};
10use agent_sdk_foundation::llm::{
11 ChatOutcome, ChatRequest, ChatResponse, Content, ContentBlock, Effort, StopReason,
12 ThinkingConfig, ThinkingMode, Usage,
13};
14use anyhow::{Context, Result};
15use async_trait::async_trait;
16use base64::Engine;
17use futures::{SinkExt, StreamExt};
18use reqwest::StatusCode;
19use serde::{Deserialize, Serialize};
20use std::collections::HashMap;
21use std::sync::Arc;
22use tokio::net::TcpStream;
23use tokio::sync::Mutex;
24use tokio_tungstenite::connect_async;
25use tokio_tungstenite::tungstenite::Message as WebSocketMessage;
26use tokio_tungstenite::tungstenite::client::IntoClientRequest;
27use tokio_tungstenite::{MaybeTlsStream, WebSocketStream};
28
29const DEFAULT_BASE_URL: &str = "https://chatgpt.com/backend-api";
30const OPENAI_CODEX_JWT_CLAIM_PATH: &str = "https://api.openai.com/auth";
31const OPENAI_CODEX_ORIGINATOR: &str = "codex_cli_rs";
32const OPENAI_CODEX_RESPONSES_BETA_HEADER: &str = "responses=experimental";
33const OPENAI_CODEX_RESPONSES_WEBSOCKETS_BETA_HEADER: &str = "responses_websockets=2026-02-06";
34const OPENAI_CODEX_TURN_STATE_HEADER: &str = "x-codex-turn-state";
35const OPENAI_CODEX_WEBSOCKET_CONNECTION_LIMIT_REACHED_CODE: &str =
36 "websocket_connection_limit_reached";
37
38pub const MODEL_GPT54: &str = "gpt-5.4";
40
41pub const MODEL_GPT53_CODEX: &str = "gpt-5.3-codex";
43
44pub const MODEL_GPT52_CODEX: &str = "gpt-5.2-codex";
46
47#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Serialize)]
49#[serde(rename_all = "lowercase")]
50pub enum ReasoningEffort {
51 Low,
52 #[default]
53 Medium,
54 High,
55 #[serde(rename = "xhigh")]
57 XHigh,
58}
59
60#[derive(Clone)]
65pub struct OpenAICodexResponsesProvider {
66 client: reqwest::Client,
67 api_key: String,
68 model: String,
69 base_url: String,
70 thinking: Option<ThinkingConfig>,
71 account_id: Option<String>,
72 websocket_sessions: Arc<Mutex<HashMap<String, Arc<Mutex<WebsocketSessionState>>>>>,
73}
74
75type CodexWebSocket = WebSocketStream<MaybeTlsStream<TcpStream>>;
76
77#[derive(Default)]
78struct WebsocketSessionState {
79 connection: Option<CodexWebSocket>,
80 last_request: Option<ApiStreamingRequest>,
81 last_response_id: Option<String>,
82 last_response_items: Vec<ApiInputItem>,
83 turn_state: Option<String>,
84 prewarmed: bool,
85 websocket_disabled: bool,
86}
87
88impl OpenAICodexResponsesProvider {
89 #[must_use]
91 pub fn new(api_key: String, model: String) -> Self {
92 Self {
93 client: reqwest::Client::new(),
94 api_key,
95 model,
96 base_url: DEFAULT_BASE_URL.to_owned(),
97 thinking: None,
98 account_id: None,
99 websocket_sessions: Arc::new(Mutex::new(HashMap::new())),
100 }
101 }
102
103 #[must_use]
105 pub fn with_base_url(api_key: String, model: String, base_url: String) -> Self {
106 Self {
107 client: reqwest::Client::new(),
108 api_key,
109 model,
110 base_url,
111 thinking: None,
112 account_id: None,
113 websocket_sessions: Arc::new(Mutex::new(HashMap::new())),
114 }
115 }
116
117 #[must_use]
119 pub fn gpt53_codex(api_key: String) -> Self {
120 Self::new(api_key, MODEL_GPT53_CODEX.to_owned())
121 }
122
123 #[must_use]
125 pub fn codex(api_key: String) -> Self {
126 Self::gpt53_codex(api_key)
127 }
128
129 #[must_use]
131 pub fn gpt54(api_key: String) -> Self {
132 Self::new(api_key, MODEL_GPT54.to_owned())
133 }
134
135 #[must_use]
137 pub const fn with_thinking(mut self, thinking: ThinkingConfig) -> Self {
138 self.thinking = Some(thinking);
139 self
140 }
141
142 #[must_use]
144 pub fn with_account_id(mut self, account_id: impl Into<String>) -> Self {
145 self.account_id = Some(account_id.into());
146 self
147 }
148
149 #[must_use]
151 pub fn with_reasoning_effort(self, effort: ReasoningEffort) -> Self {
152 self.with_thinking(ThinkingConfig::default().with_effort(map_reasoning_effort(effort)))
153 }
154
155 const fn max_output_tokens(request: &ChatRequest) -> Option<u32> {
156 if request.max_tokens_explicit {
157 Some(request.max_tokens)
158 } else {
159 None
160 }
161 }
162
163 fn build_headers(
164 &self,
165 streaming: bool,
166 session_id: Option<&str>,
167 turn_state: Option<&str>,
168 ) -> Result<reqwest::header::HeaderMap> {
169 self.build_headers_with_beta(
170 streaming,
171 session_id,
172 OPENAI_CODEX_RESPONSES_BETA_HEADER,
173 turn_state,
174 )
175 }
176
177 fn build_websocket_headers(
178 &self,
179 session_id: Option<&str>,
180 turn_state: Option<&str>,
181 ) -> Result<reqwest::header::HeaderMap> {
182 self.build_headers_with_beta(
183 false,
184 session_id,
185 OPENAI_CODEX_RESPONSES_WEBSOCKETS_BETA_HEADER,
186 turn_state,
187 )
188 }
189
190 fn build_headers_with_beta(
191 &self,
192 streaming: bool,
193 session_id: Option<&str>,
194 beta_header: &'static str,
195 turn_state: Option<&str>,
196 ) -> Result<reqwest::header::HeaderMap> {
197 use reqwest::header::{
198 ACCEPT, AUTHORIZATION, CONTENT_TYPE, HeaderMap, HeaderValue, USER_AGENT,
199 };
200
201 let account_id = self
202 .account_id
203 .clone()
204 .map_or_else(|| extract_account_id(&self.api_key), Ok)
205 .context("failed to extract chatgpt account id from OpenAI Codex OAuth token")?;
206
207 let mut headers = HeaderMap::new();
208 headers.insert(
209 AUTHORIZATION,
210 HeaderValue::from_str(&format!("Bearer {}", self.api_key))?,
211 );
212 headers.insert("chatgpt-account-id", HeaderValue::from_str(&account_id)?);
213 headers.insert("OpenAI-Beta", HeaderValue::from_static(beta_header));
214 headers.insert(
215 "originator",
216 HeaderValue::from_static(OPENAI_CODEX_ORIGINATOR),
217 );
218 headers.insert(CONTENT_TYPE, HeaderValue::from_static("application/json"));
219 headers.insert(
220 USER_AGENT,
221 HeaderValue::from_str(&format!(
222 "{OPENAI_CODEX_ORIGINATOR}/{} ({} {})",
223 env!("CARGO_PKG_VERSION"),
224 std::env::consts::OS,
225 std::env::consts::ARCH,
226 ))?,
227 );
228 if streaming {
229 headers.insert(ACCEPT, HeaderValue::from_static("text/event-stream"));
230 }
231 if let Some(session_id) = session_id {
232 let session_id_header = HeaderValue::from_str(session_id)?;
233 headers.insert("session_id", session_id_header.clone());
234 headers.insert("x-client-request-id", session_id_header);
235 }
236 if let Some(turn_state) = turn_state {
237 headers.insert(
238 OPENAI_CODEX_TURN_STATE_HEADER,
239 HeaderValue::from_str(turn_state)?,
240 );
241 }
242
243 Ok(headers)
244 }
245
246 async fn websocket_session(&self, session_id: &str) -> Arc<Mutex<WebsocketSessionState>> {
247 let mut sessions = self.websocket_sessions.lock().await;
248 sessions
249 .entry(session_id.to_string())
250 .or_insert_with(|| Arc::new(Mutex::new(WebsocketSessionState::default())))
251 .clone()
252 }
253
254 async fn connect_websocket(
255 &self,
256 session_id: Option<&str>,
257 turn_state: Option<&str>,
258 ) -> Result<(CodexWebSocket, Option<String>)> {
259 let headers = self.build_websocket_headers(session_id, turn_state)?;
260 let url = codex_websocket_url(&self.base_url)
261 .context("failed to build OpenAI Codex websocket URL")?;
262 let mut request = url
263 .as_str()
264 .into_client_request()
265 .context("failed to build OpenAI Codex websocket request")?;
266 request.headers_mut().extend(headers);
267
268 let (stream, response) = connect_async(request)
269 .await
270 .context("failed to connect OpenAI Codex websocket")?;
271 let turn_state = response
272 .headers()
273 .get(OPENAI_CODEX_TURN_STATE_HEADER)
274 .and_then(|value| value.to_str().ok())
275 .map(ToOwned::to_owned);
276 Ok((stream, turn_state))
277 }
278
279 fn map_response(api_response: ApiResponse) -> ChatResponse {
280 let content = build_content_blocks(&api_response.output);
281 let has_tool_calls = content
282 .iter()
283 .any(|block| matches!(block, ContentBlock::ToolUse { .. }));
284 let stop_reason = if has_tool_calls {
285 Some(StopReason::ToolUse)
286 } else {
287 api_response.status.map(|status| match status {
288 ApiStatus::Completed => StopReason::EndTurn,
289 ApiStatus::Incomplete => StopReason::MaxTokens,
290 ApiStatus::Failed => StopReason::StopSequence,
291 })
292 };
293
294 ChatResponse {
295 id: api_response.id,
296 content,
297 model: api_response.model,
298 stop_reason,
299 usage: api_response.usage.map_or(
300 Usage {
301 input_tokens: 0,
302 output_tokens: 0,
303 cached_input_tokens: 0,
304 cache_creation_input_tokens: 0,
305 },
306 |usage| Usage {
307 input_tokens: usage.input_tokens,
308 output_tokens: usage.output_tokens,
309 cached_input_tokens: usage
310 .input_tokens_details
311 .as_ref()
312 .map_or(0, |details| details.cached_tokens),
313 cache_creation_input_tokens: 0,
314 },
315 ),
316 }
317 }
318}
319
320#[async_trait]
321impl LlmProvider for OpenAICodexResponsesProvider {
322 async fn chat(&self, request: ChatRequest) -> Result<ChatOutcome> {
323 let thinking_config = match self.resolve_thinking_config(request.thinking.as_ref()) {
324 Ok(thinking) => thinking,
325 Err(error) => return Ok(ChatOutcome::InvalidRequest(error.to_string())),
326 };
327 if let Err(error) = validate_request_attachments(self.provider(), self.model(), &request) {
328 return Ok(ChatOutcome::InvalidRequest(error.to_string()));
329 }
330 let reasoning = build_api_reasoning(thinking_config.as_ref());
331 let input = build_api_input(&request);
332 let max_output_tokens = Self::max_output_tokens(&request);
333 let prompt_cache_key = request.session_id.as_deref();
334 let tools: Option<Vec<ApiTool>> = request
335 .tools
336 .as_ref()
337 .map(|ts| ts.iter().cloned().map(convert_tool).collect());
338 let parallel_tool_calls = tools.as_ref().is_some_and(|tools| !tools.is_empty());
339
340 let api_request = ApiResponsesRequest {
341 model: &self.model,
342 instructions: request.system.as_str(),
343 input: &input,
344 tools: tools.as_deref(),
345 max_output_tokens,
346 reasoning,
347 tool_choice: Some("auto"),
348 parallel_tool_calls: parallel_tool_calls.then_some(true),
349 store: false,
350 text: Some(ApiTextSettings {
351 verbosity: "medium",
352 }),
353 include: Some(&["reasoning.encrypted_content"]),
354 prompt_cache_key,
355 };
356
357 log::debug!(
358 "OpenAI Codex request model={} max_tokens={}",
359 self.model,
360 request.max_tokens
361 );
362
363 let response = self
364 .client
365 .post(codex_url(&self.base_url))
366 .headers(self.build_headers(false, request.session_id.as_deref(), None)?)
367 .json(&api_request)
368 .send()
369 .await
370 .map_err(|e| anyhow::anyhow!("request failed: {e}"))?;
371
372 let status = response.status();
373 let bytes = response
374 .bytes()
375 .await
376 .map_err(|e| anyhow::anyhow!("failed to read response body: {e}"))?;
377
378 log::debug!(
379 "OpenAI Codex response status={} body_len={}",
380 status,
381 bytes.len()
382 );
383
384 if status == StatusCode::TOO_MANY_REQUESTS {
385 return Ok(ChatOutcome::RateLimited);
386 }
387
388 if status.is_server_error() {
389 let body = String::from_utf8_lossy(&bytes);
390 log::error!("OpenAI Codex server error status={status} body={body}");
391 return Ok(ChatOutcome::ServerError(body.into_owned()));
392 }
393
394 if status.is_client_error() {
395 let body = String::from_utf8_lossy(&bytes);
396 log::warn!("OpenAI Codex client error status={status} body={body}");
397 return Ok(ChatOutcome::InvalidRequest(body.into_owned()));
398 }
399
400 let api_response: ApiResponse = serde_json::from_slice(&bytes)
401 .map_err(|e| anyhow::anyhow!("failed to parse response: {e}"))?;
402
403 Ok(ChatOutcome::Success(Self::map_response(api_response)))
404 }
405
406 #[allow(clippy::too_many_lines)]
407 fn chat_stream(&self, request: ChatRequest) -> StreamBox<'_> {
408 Box::pin(async_stream::stream! {
409 let thinking_config = match self.resolve_thinking_config(request.thinking.as_ref()) {
410 Ok(thinking) => thinking,
411 Err(error) => {
412 yield Ok(StreamDelta::Error {
413 message: error.to_string(),
414 kind: StreamErrorKind::InvalidRequest,
415 });
416 return;
417 }
418 };
419 if let Err(error) = validate_request_attachments(self.provider(), self.model(), &request) {
420 yield Ok(StreamDelta::Error {
421 message: error.to_string(),
422 kind: StreamErrorKind::InvalidRequest,
423 });
424 return;
425 }
426
427 let reasoning = build_api_reasoning(thinking_config.as_ref());
428 let input = build_api_input(&request);
429 let max_output_tokens = Self::max_output_tokens(&request);
430 let tools: Option<Vec<ApiTool>> = request
431 .tools
432 .as_ref()
433 .map(|ts| ts.iter().cloned().map(convert_tool).collect());
434 let parallel_tool_calls = tools.as_ref().is_some_and(|tools| !tools.is_empty());
435 let api_request = ApiStreamingRequest {
436 model: self.model.clone(),
437 instructions: request.system.clone(),
438 input,
439 tools,
440 max_output_tokens,
441 reasoning,
442 tool_choice: Some("auto".to_string()),
443 parallel_tool_calls: parallel_tool_calls.then_some(true),
444 store: false,
445 text: Some(ApiTextSettings { verbosity: "medium" }),
446 include: Some(vec!["reasoning.encrypted_content".to_string()]),
447 prompt_cache_key: request.session_id.clone(),
448 stream: true,
449 };
450
451 log::debug!("OpenAI Codex streaming request model={} max_tokens={}", self.model, request.max_tokens);
452
453 let mut sse_turn_state: Option<String> = None;
454
455 if let Some(session_id) = request.session_id.as_deref() {
456 let session = self.websocket_session(session_id).await;
457 let mut websocket_session = session.lock().await;
458
459 if !websocket_session.websocket_disabled {
460 'websocket_attempts: for attempt in 0..2 {
461 if websocket_session.connection.is_none() {
462 match self
463 .connect_websocket(
464 Some(session_id),
465 websocket_session.turn_state.as_deref(),
466 )
467 .await
468 {
469 Ok((connection, turn_state)) => {
470 websocket_session.connection = Some(connection);
471 if let Some(turn_state) = turn_state {
472 websocket_session.turn_state = Some(turn_state);
473 }
474 websocket_session.prewarmed = false;
475 }
476 Err(error) => {
477 log::warn!(
478 "OpenAI Codex websocket connect failed on attempt {}: {error:#}",
479 attempt + 1,
480 );
481 if attempt == 1 {
482 websocket_session.websocket_disabled = true;
483 }
484 continue;
485 }
486 }
487 }
488
489 if websocket_session.connection.is_some()
490 && websocket_session.last_request.is_none()
491 && !websocket_session.prewarmed
492 {
493 let mut warmup_request = ApiWebsocketRequest::from(&api_request);
494 warmup_request.generate = Some(false);
495 let warmup_payload = match serde_json::to_string(&warmup_request) {
496 Ok(payload) => payload,
497 Err(error) => {
498 yield Ok(StreamDelta::Error {
499 message: format!(
500 "failed to encode websocket warmup request: {error}"
501 ),
502 kind: StreamErrorKind::InvalidRequest,
503 });
504 return;
505 }
506 };
507
508 let warmup_send_result = if let Some(connection) =
509 websocket_session.connection.as_mut()
510 {
511 connection.send(WebSocketMessage::Text(warmup_payload.into())).await
512 } else {
513 Err(tokio_tungstenite::tungstenite::Error::ConnectionClosed)
514 };
515
516 if let Err(error) = warmup_send_result {
517 log::warn!(
518 "OpenAI Codex websocket warmup send failed on attempt {}: {error}",
519 attempt + 1,
520 );
521 reset_websocket_connection(&mut websocket_session);
522 if attempt == 1 {
523 websocket_session.websocket_disabled = true;
524 }
525 continue;
526 }
527
528 let mut warmup_response_id: Option<String> = None;
529 let mut warmup_response_items = Vec::new();
530
531 loop {
532 let message_result = if let Some(connection) =
533 websocket_session.connection.as_mut()
534 {
535 connection.next().await
536 } else {
537 None
538 };
539 let Some(message_result) = message_result else {
540 log::warn!(
541 "OpenAI Codex websocket warmup closed before completion on attempt {}",
542 attempt + 1,
543 );
544 reset_websocket_connection(&mut websocket_session);
545 if attempt == 1 {
546 websocket_session.websocket_disabled = true;
547 }
548 continue 'websocket_attempts;
549 };
550
551 let message = match message_result {
552 Ok(message) => message,
553 Err(error) => {
554 log::warn!(
555 "OpenAI Codex websocket warmup failed on attempt {}: {error}",
556 attempt + 1,
557 );
558 reset_websocket_connection(&mut websocket_session);
559 if attempt == 1 {
560 websocket_session.websocket_disabled = true;
561 }
562 continue 'websocket_attempts;
563 }
564 };
565
566 match message {
567 WebSocketMessage::Text(text) => {
568 if let Some((status, message)) =
569 parse_wrapped_websocket_error_event(&text)
570 {
571 log::warn!(
572 "OpenAI Codex websocket warmup wrapped error on attempt {} status={} message={message}",
573 attempt + 1,
574 status,
575 );
576 if status == StatusCode::UNAUTHORIZED
577 || status == StatusCode::UPGRADE_REQUIRED
578 || status.is_client_error()
579 {
580 websocket_session.websocket_disabled = true;
581 }
582 reset_websocket_connection(&mut websocket_session);
583 continue 'websocket_attempts;
584 }
585 if let Ok(event) =
586 serde_json::from_str::<ApiStreamEvent>(&text)
587 {
588 match event.r#type.as_str() {
589 "response.output_item.added" => {
590 if let Some(item) = event.item
591 && let Ok(item) =
592 serde_json::from_value::<ApiOutputItem>(item)
593 && let Some(item) =
594 output_item_to_input_item(item)
595 {
596 warmup_response_items.push(item);
597 }
598 }
599 "response.completed" | "response.done" => {
600 if let Some(resp) = event.response
601 && let Some(id) = resp.id
602 {
603 warmup_response_id = Some(id);
604 }
605 websocket_session.last_request =
606 Some(api_request.clone());
607 websocket_session.last_response_id =
608 warmup_response_id;
609 websocket_session.last_response_items =
610 warmup_response_items;
611 websocket_session.prewarmed = true;
612 break;
613 }
614 "response.incomplete" | "response.failed" => {
615 log::warn!(
616 "OpenAI Codex websocket warmup returned {} on attempt {}",
617 event.r#type,
618 attempt + 1,
619 );
620 reset_websocket_connection(&mut websocket_session);
621 if attempt == 1 {
622 websocket_session.websocket_disabled = true;
623 }
624 continue 'websocket_attempts;
625 }
626 _ => {}
627 }
628 }
629 }
630 WebSocketMessage::Binary(bytes) => {
631 if let Ok(text) = String::from_utf8(bytes.to_vec()) {
632 if let Some((status, message)) =
633 parse_wrapped_websocket_error_event(&text)
634 {
635 log::warn!(
636 "OpenAI Codex websocket warmup wrapped error on attempt {} status={} message={message}",
637 attempt + 1,
638 status,
639 );
640 if status == StatusCode::UNAUTHORIZED
641 || status == StatusCode::UPGRADE_REQUIRED
642 || status.is_client_error()
643 {
644 websocket_session.websocket_disabled = true;
645 }
646 reset_websocket_connection(&mut websocket_session);
647 continue 'websocket_attempts;
648 }
649
650 if let Ok(event) =
651 serde_json::from_str::<ApiStreamEvent>(&text)
652 {
653 match event.r#type.as_str() {
654 "response.output_item.added" => {
655 if let Some(item) = event.item
656 && let Ok(item) = serde_json::from_value::<
657 ApiOutputItem,
658 >(item)
659 && let Some(item) =
660 output_item_to_input_item(item)
661 {
662 warmup_response_items.push(item);
663 }
664 }
665 "response.completed" | "response.done" => {
666 if let Some(resp) = event.response
667 && let Some(id) = resp.id
668 {
669 warmup_response_id = Some(id);
670 }
671 websocket_session.last_request =
672 Some(api_request.clone());
673 websocket_session.last_response_id =
674 warmup_response_id;
675 websocket_session.last_response_items =
676 warmup_response_items;
677 websocket_session.prewarmed = true;
678 break;
679 }
680 "response.incomplete" | "response.failed" => {
681 log::warn!(
682 "OpenAI Codex websocket warmup returned {} on attempt {}",
683 event.r#type,
684 attempt + 1,
685 );
686 reset_websocket_connection(&mut websocket_session);
687 if attempt == 1 {
688 websocket_session.websocket_disabled = true;
689 }
690 continue 'websocket_attempts;
691 }
692 _ => {}
693 }
694 }
695 }
696 }
697 WebSocketMessage::Ping(payload) => {
698 if let Some(connection) =
699 websocket_session.connection.as_mut()
700 && let Err(error) = connection
701 .send(WebSocketMessage::Pong(payload))
702 .await
703 {
704 log::warn!(
705 "OpenAI Codex websocket warmup pong failed on attempt {}: {error}",
706 attempt + 1,
707 );
708 reset_websocket_connection(&mut websocket_session);
709 if attempt == 1 {
710 websocket_session.websocket_disabled = true;
711 }
712 continue 'websocket_attempts;
713 }
714 }
715 WebSocketMessage::Pong(_) | WebSocketMessage::Frame(_) => {}
716 WebSocketMessage::Close(_) => {
717 log::warn!(
718 "OpenAI Codex websocket warmup closed on attempt {}",
719 attempt + 1,
720 );
721 reset_websocket_connection(&mut websocket_session);
722 if attempt == 1 {
723 websocket_session.websocket_disabled = true;
724 }
725 continue 'websocket_attempts;
726 }
727 }
728 }
729 }
730
731 let websocket_request = prepare_websocket_request(
732 &api_request,
733 &websocket_session,
734 websocket_session.prewarmed,
735 );
736 let request_payload = match serde_json::to_string(&websocket_request) {
737 Ok(payload) => payload,
738 Err(error) => {
739 yield Ok(StreamDelta::Error {
740 message: format!(
741 "failed to encode websocket request: {error}"
742 ),
743 kind: StreamErrorKind::InvalidRequest,
744 });
745 return;
746 }
747 };
748
749 let send_result = if let Some(connection) = websocket_session.connection.as_mut() {
750 connection.send(WebSocketMessage::Text(request_payload.into())).await
751 } else {
752 Err(tokio_tungstenite::tungstenite::Error::ConnectionClosed)
753 };
754
755 if let Err(error) = send_result {
756 log::warn!(
757 "OpenAI Codex websocket send failed on attempt {}: {error}",
758 attempt + 1,
759 );
760 reset_websocket_connection(&mut websocket_session);
761 if attempt == 1 {
762 websocket_session.websocket_disabled = true;
763 }
764 continue;
765 }
766
767 let mut usage: Option<Usage> = None;
768 let mut tool_calls: HashMap<String, ToolCallAccumulator> = HashMap::new();
769 let mut response_id: Option<String> = None;
770 let mut response_items = Vec::new();
771 let mut emitted_output = false;
772
773 loop {
774 let message_result = if let Some(connection) =
775 websocket_session.connection.as_mut()
776 {
777 connection.next().await
778 } else {
779 None
780 };
781 let Some(message_result) = message_result else {
782 if emitted_output {
783 reset_websocket_connection(&mut websocket_session);
784 yield Ok(StreamDelta::Error {
785 message: "websocket closed before response.completed"
786 .to_string(),
787 kind: StreamErrorKind::ServerError,
788 });
789 return;
790 }
791 reset_websocket_connection(&mut websocket_session);
792 if attempt == 1 {
793 websocket_session.websocket_disabled = true;
794 }
795 continue 'websocket_attempts;
796 };
797
798 let message = match message_result {
799 Ok(message) => message,
800 Err(error) => {
801 if emitted_output {
802 reset_websocket_connection(&mut websocket_session);
803 yield Ok(StreamDelta::Error {
804 message: format!("websocket error: {error}"),
805 kind: StreamErrorKind::ServerError,
806 });
807 return;
808 }
809 reset_websocket_connection(&mut websocket_session);
810 if attempt == 1 {
811 websocket_session.websocket_disabled = true;
812 }
813 continue 'websocket_attempts;
814 }
815 };
816
817 match message {
818 WebSocketMessage::Text(text) => {
819 if let Some((status, message)) =
820 parse_wrapped_websocket_error_event(&text)
821 {
822 let kind = if status == StatusCode::TOO_MANY_REQUESTS {
823 StreamErrorKind::RateLimited
824 } else if status.is_server_error() {
825 StreamErrorKind::ServerError
826 } else {
827 StreamErrorKind::InvalidRequest
828 };
829 if emitted_output {
830 reset_websocket_connection(&mut websocket_session);
831 yield Ok(StreamDelta::Error {
832 message,
833 kind,
834 });
835 return;
836 }
837 if status == StatusCode::UNAUTHORIZED
838 || status == StatusCode::UPGRADE_REQUIRED
839 || status.is_client_error()
840 {
841 websocket_session.websocket_disabled = true;
842 }
843 reset_websocket_connection(&mut websocket_session);
844 continue 'websocket_attempts;
845 }
846 if let Ok(event) = serde_json::from_str::<ApiStreamEvent>(&text) {
847 match event.r#type.as_str() {
848 "response.output_text.delta" => {
849 if let Some(delta) = event.delta {
850 emitted_output = true;
851 yield Ok(StreamDelta::TextDelta {
852 delta,
853 block_index: 0,
854 });
855 }
856 }
857 "response.function_call_arguments.delta" => {
858 if let (Some(call_id), Some(delta)) =
859 (event.call_id, event.delta)
860 {
861 let acc = tool_calls
862 .entry(call_id.clone())
863 .or_insert_with(|| ToolCallAccumulator {
864 id: call_id,
865 name: event.name.unwrap_or_default(),
866 arguments: String::new(),
867 });
868 acc.arguments.push_str(&delta);
869 }
870 }
871 "response.output_item.added" => {
872 if let Some(item) = event.item
873 && let Ok(item) =
874 serde_json::from_value::<ApiOutputItem>(item)
875 && let Some(item) = output_item_to_input_item(item)
876 {
877 response_items.push(item);
878 }
879 }
880 "response.completed"
881 | "response.incomplete"
882 | "response.done" => {
883 if let Some(resp) = event.response {
884 if let Some(u) = resp.usage {
885 usage = Some(usage_from_api_usage(&u));
886 }
887 if let Some(id) = resp.id {
888 response_id = Some(id);
889 }
890 }
891 let final_status = Some(match event.r#type.as_str() {
892 "response.incomplete" => ApiStatus::Incomplete,
893 _ => ApiStatus::Completed,
894 });
895 for delta in emit_accumulated_tool_calls(&tool_calls) {
896 yield Ok(delta);
897 }
898 if let Some(u) = usage.take() {
899 yield Ok(StreamDelta::Usage(u));
900 }
901 websocket_session.last_request = Some(api_request.clone());
902 websocket_session.last_response_id = response_id;
903 websocket_session.last_response_items = response_items;
904 websocket_session.prewarmed = false;
905 yield Ok(StreamDelta::Done {
906 stop_reason: Some(stop_reason_from_stream_state(
907 &tool_calls,
908 final_status,
909 )),
910 });
911 return;
912 }
913 "response.failed" => {
914 websocket_session.last_request = None;
915 websocket_session.last_response_id = None;
916 websocket_session.last_response_items.clear();
917 websocket_session.prewarmed = false;
918 let message = event
919 .response
920 .and_then(|resp| resp.error)
921 .and_then(|error| error.message)
922 .unwrap_or_else(|| {
923 "Codex response failed".to_string()
924 });
925 yield Ok(StreamDelta::Error {
926 message,
927 kind: StreamErrorKind::ServerError,
928 });
929 return;
930 }
931 _ => {}
932 }
933 }
934 }
935 WebSocketMessage::Binary(bytes) => {
936 if let Ok(text) = String::from_utf8(bytes.to_vec()) {
937 if let Some((status, message)) =
938 parse_wrapped_websocket_error_event(&text)
939 {
940 let kind = if status == StatusCode::TOO_MANY_REQUESTS {
941 StreamErrorKind::RateLimited
942 } else if status.is_server_error() {
943 StreamErrorKind::ServerError
944 } else {
945 StreamErrorKind::InvalidRequest
946 };
947 if emitted_output {
948 reset_websocket_connection(&mut websocket_session);
949 yield Ok(StreamDelta::Error {
950 message,
951 kind,
952 });
953 return;
954 }
955 if status == StatusCode::UNAUTHORIZED
956 || status == StatusCode::UPGRADE_REQUIRED
957 || status.is_client_error()
958 {
959 websocket_session.websocket_disabled = true;
960 }
961 reset_websocket_connection(&mut websocket_session);
962 continue 'websocket_attempts;
963 }
964
965 if let Ok(event) =
966 serde_json::from_str::<ApiStreamEvent>(&text)
967 {
968 match event.r#type.as_str() {
969 "response.output_text.delta" => {
970 if let Some(delta) = event.delta {
971 emitted_output = true;
972 yield Ok(StreamDelta::TextDelta {
973 delta,
974 block_index: 0,
975 });
976 }
977 }
978 "response.function_call_arguments.delta" => {
979 if let (Some(call_id), Some(delta)) =
980 (event.call_id, event.delta)
981 {
982 let acc = tool_calls
983 .entry(call_id.clone())
984 .or_insert_with(|| ToolCallAccumulator {
985 id: call_id,
986 name: event.name.unwrap_or_default(),
987 arguments: String::new(),
988 });
989 acc.arguments.push_str(&delta);
990 }
991 }
992 "response.output_item.added" => {
993 if let Some(item) = event.item
994 && let Ok(item) = serde_json::from_value::<
995 ApiOutputItem,
996 >(item)
997 && let Some(item) =
998 output_item_to_input_item(item)
999 {
1000 response_items.push(item);
1001 }
1002 }
1003 "response.completed"
1004 | "response.incomplete"
1005 | "response.done" => {
1006 if let Some(resp) = event.response {
1007 if let Some(u) = resp.usage {
1008 usage = Some(usage_from_api_usage(&u));
1009 }
1010 if let Some(id) = resp.id {
1011 response_id = Some(id);
1012 }
1013 }
1014 let final_status = Some(
1015 match event.r#type.as_str() {
1016 "response.incomplete" => ApiStatus::Incomplete,
1017 _ => ApiStatus::Completed,
1018 },
1019 );
1020 for delta in
1021 emit_accumulated_tool_calls(&tool_calls)
1022 {
1023 yield Ok(delta);
1024 }
1025 if let Some(u) = usage.take() {
1026 yield Ok(StreamDelta::Usage(u));
1027 }
1028 websocket_session.last_request =
1029 Some(api_request.clone());
1030 websocket_session.last_response_id = response_id;
1031 websocket_session.last_response_items =
1032 response_items;
1033 websocket_session.prewarmed = false;
1034 yield Ok(StreamDelta::Done {
1035 stop_reason: Some(
1036 stop_reason_from_stream_state(
1037 &tool_calls,
1038 final_status,
1039 ),
1040 ),
1041 });
1042 return;
1043 }
1044 "response.failed" => {
1045 websocket_session.last_request = None;
1046 websocket_session.last_response_id = None;
1047 websocket_session.last_response_items.clear();
1048 websocket_session.prewarmed = false;
1049 let message = event
1050 .response
1051 .and_then(|resp| resp.error)
1052 .and_then(|error| error.message)
1053 .unwrap_or_else(|| {
1054 "Codex response failed".to_string()
1055 });
1056 yield Ok(StreamDelta::Error {
1057 message,
1058 kind: StreamErrorKind::ServerError,
1059 });
1060 return;
1061 }
1062 _ => {}
1063 }
1064 }
1065 }
1066 }
1067 WebSocketMessage::Ping(payload) => {
1068 if let Some(connection) = websocket_session.connection.as_mut()
1069 && let Err(error) = connection
1070 .send(WebSocketMessage::Pong(payload))
1071 .await
1072 {
1073 if emitted_output {
1074 reset_websocket_connection(&mut websocket_session);
1075 yield Ok(StreamDelta::Error {
1076 message: format!("websocket pong failed: {error}"),
1077 kind: StreamErrorKind::ServerError,
1078 });
1079 return;
1080 }
1081 reset_websocket_connection(&mut websocket_session);
1082 if attempt == 1 {
1083 websocket_session.websocket_disabled = true;
1084 }
1085 continue 'websocket_attempts;
1086 }
1087 }
1088 WebSocketMessage::Pong(_) | WebSocketMessage::Frame(_) => {}
1089 WebSocketMessage::Close(_) => {
1090 if emitted_output {
1091 reset_websocket_connection(&mut websocket_session);
1092 yield Ok(StreamDelta::Error {
1093 message: "websocket closed before response.completed"
1094 .to_string(),
1095 kind: StreamErrorKind::ServerError,
1096 });
1097 return;
1098 }
1099 reset_websocket_connection(&mut websocket_session);
1100 if attempt == 1 {
1101 websocket_session.websocket_disabled = true;
1102 }
1103 continue 'websocket_attempts;
1104 }
1105 }
1106 }
1107 }
1108 }
1109 sse_turn_state = websocket_session.turn_state.clone();
1110 drop(websocket_session);
1111 }
1112
1113 let headers = match self.build_headers(
1114 true,
1115 request.session_id.as_deref(),
1116 sse_turn_state.as_deref(),
1117 ) {
1118 Ok(headers) => headers,
1119 Err(error) => {
1120 yield Ok(StreamDelta::Error {
1121 message: error.to_string(),
1122 kind: StreamErrorKind::InvalidRequest,
1123 });
1124 return;
1125 }
1126 };
1127
1128 let Ok(response) = self.client
1129 .post(codex_url(&self.base_url))
1130 .headers(headers)
1131 .json(&api_request)
1132 .send()
1133 .await
1134 else {
1135 yield Err(anyhow::anyhow!("request failed"));
1136 return;
1137 };
1138
1139 let status = response.status();
1140 if !status.is_success() {
1141 let body = response.text().await.unwrap_or_default();
1142 let kind = if status == StatusCode::TOO_MANY_REQUESTS {
1143 StreamErrorKind::RateLimited
1144 } else if status.is_server_error() {
1145 StreamErrorKind::ServerError
1146 } else {
1147 StreamErrorKind::InvalidRequest
1148 };
1149 log::warn!("OpenAI Codex error status={status} body={body}");
1150 yield Ok(StreamDelta::Error { message: body, kind });
1151 return;
1152 }
1153
1154 if let Some(session_id) = request.session_id.as_deref() {
1155 let turn_state = response
1156 .headers()
1157 .get(OPENAI_CODEX_TURN_STATE_HEADER)
1158 .and_then(|value| value.to_str().ok())
1159 .map(ToOwned::to_owned);
1160 if let Some(turn_state) = turn_state {
1161 let session = self.websocket_session(session_id).await;
1162 let mut websocket_session = session.lock().await;
1163 websocket_session.turn_state = Some(turn_state);
1164 }
1165 }
1166
1167 let mut buffer = String::new();
1168 let mut stream = response.bytes_stream();
1169 let mut usage: Option<Usage> = None;
1170 let mut tool_calls: HashMap<String, ToolCallAccumulator> = HashMap::new();
1171 let mut final_status: Option<ApiStatus> = None;
1172
1173 while let Some(chunk_result) = stream.next().await {
1174 let Ok(chunk) = chunk_result else {
1175 yield Err(anyhow::anyhow!("stream error"));
1176 return;
1177 };
1178 buffer.push_str(&String::from_utf8_lossy(&chunk));
1179
1180 while let Some(pos) = buffer.find('\n') {
1181 let line = buffer[..pos].trim().to_string();
1182 buffer = buffer[pos + 1..].to_string();
1183 if line.is_empty() {
1184 continue;
1185 }
1186
1187 let Some(data) = line.strip_prefix("data: ") else {
1188 continue;
1189 };
1190
1191 if data == "[DONE]" {
1192 for delta in emit_accumulated_tool_calls(&tool_calls) {
1193 yield Ok(delta);
1194 }
1195 if let Some(u) = usage.take() {
1196 yield Ok(StreamDelta::Usage(u));
1197 }
1198 yield Ok(StreamDelta::Done {
1199 stop_reason: Some(stop_reason_from_stream_state(&tool_calls, final_status)),
1200 });
1201 return;
1202 }
1203
1204 if let Ok(event) = serde_json::from_str::<ApiStreamEvent>(data) {
1205 match event.r#type.as_str() {
1206 "response.output_text.delta" => {
1207 if let Some(delta) = event.delta {
1208 yield Ok(StreamDelta::TextDelta { delta, block_index: 0 });
1209 }
1210 }
1211 "response.function_call_arguments.delta" => {
1212 if let (Some(call_id), Some(delta)) = (event.call_id, event.delta) {
1213 let acc = tool_calls.entry(call_id.clone()).or_insert_with(|| {
1214 ToolCallAccumulator {
1215 id: call_id,
1216 name: event.name.unwrap_or_default(),
1217 arguments: String::new(),
1218 }
1219 });
1220 acc.arguments.push_str(&delta);
1221 }
1222 }
1223 "response.completed" | "response.incomplete" | "response.done" => {
1224 if let Some(resp) = event.response
1225 && let Some(u) = resp.usage
1226 {
1227 usage = Some(usage_from_api_usage(&u));
1228 }
1229 final_status = Some(match event.r#type.as_str() {
1230 "response.incomplete" => ApiStatus::Incomplete,
1231 _ => ApiStatus::Completed,
1232 });
1233 }
1234 "response.failed" => {
1235 let message = event
1236 .response
1237 .and_then(|resp| resp.error)
1238 .and_then(|error| error.message)
1239 .unwrap_or_else(|| "Codex response failed".to_string());
1240 yield Ok(StreamDelta::Error {
1241 message,
1242 kind: StreamErrorKind::ServerError,
1243 });
1244 return;
1245 }
1246 _ => {}
1247 }
1248 }
1249 }
1250 }
1251
1252 if let Some(u) = usage {
1253 yield Ok(StreamDelta::Usage(u));
1254 }
1255 yield Ok(StreamDelta::Done {
1256 stop_reason: Some(stop_reason_from_stream_state(&tool_calls, final_status)),
1257 });
1258 })
1259 }
1260
1261 fn model(&self) -> &str {
1262 &self.model
1263 }
1264
1265 fn provider(&self) -> &'static str {
1266 "openai-codex"
1267 }
1268
1269 fn configured_thinking(&self) -> Option<&ThinkingConfig> {
1270 self.thinking.as_ref()
1271 }
1272}
1273
1274fn build_api_input(request: &ChatRequest) -> Vec<ApiInputItem> {
1279 let mut items = Vec::new();
1280
1281 for msg in &request.messages {
1284 match &msg.content {
1285 Content::Text(text) => {
1286 items.push(ApiInputItem::Message(ApiMessage {
1287 role: match msg.role {
1288 agent_sdk_foundation::llm::Role::User => ApiRole::User,
1289 agent_sdk_foundation::llm::Role::Assistant => ApiRole::Assistant,
1290 },
1291 content: ApiMessageContent::Text(text.clone()),
1292 }));
1293 }
1294 Content::Blocks(blocks) => {
1295 let mut content_parts = Vec::new();
1296
1297 for block in blocks {
1298 match block {
1299 ContentBlock::Text { text } => {
1300 let part = match msg.role {
1301 agent_sdk_foundation::llm::Role::Assistant => {
1302 ApiInputContent::OutputText { text: text.clone() }
1303 }
1304 agent_sdk_foundation::llm::Role::User => {
1305 ApiInputContent::InputText { text: text.clone() }
1306 }
1307 };
1308 content_parts.push(part);
1309 }
1310 ContentBlock::Thinking { .. } | ContentBlock::RedactedThinking { .. } => {}
1311 ContentBlock::Image { source } => {
1312 content_parts.push(ApiInputContent::Image {
1313 image_url: format!(
1314 "data:{};base64,{}",
1315 source.media_type, source.data
1316 ),
1317 });
1318 }
1319 ContentBlock::Document { source } => {
1320 content_parts.push(ApiInputContent::File {
1321 filename: suggested_filename(&source.media_type),
1322 file_data: format!(
1323 "data:{};base64,{}",
1324 source.media_type, source.data
1325 ),
1326 });
1327 }
1328 ContentBlock::ToolUse {
1329 id, name, input, ..
1330 } => {
1331 items.push(ApiInputItem::FunctionCall(ApiFunctionCall::new(
1332 id.clone(),
1333 name.clone(),
1334 serde_json::to_string(input).unwrap_or_default(),
1335 )));
1336 }
1337 ContentBlock::ToolResult {
1338 tool_use_id,
1339 content,
1340 ..
1341 } => {
1342 items.push(ApiInputItem::FunctionCallOutput(
1343 ApiFunctionCallOutput::new(tool_use_id.clone(), content.clone()),
1344 ));
1345 }
1346 _ => {
1349 log::warn!("Skipping unrecognized OpenAI Responses content block");
1350 }
1351 }
1352 }
1353
1354 if !content_parts.is_empty() {
1355 items.push(ApiInputItem::Message(ApiMessage {
1356 role: match msg.role {
1357 agent_sdk_foundation::llm::Role::User => ApiRole::User,
1358 agent_sdk_foundation::llm::Role::Assistant => ApiRole::Assistant,
1359 },
1360 content: ApiMessageContent::Parts(content_parts),
1361 }));
1362 }
1363 }
1364 }
1365 }
1366
1367 items
1368}
1369
1370fn fix_schema_for_strict_mode(schema: &mut serde_json::Value) {
1373 let Some(obj) = schema.as_object_mut() else {
1374 return;
1375 };
1376
1377 let is_object_type = obj
1379 .get("type")
1380 .is_some_and(|t| t.as_str() == Some("object"));
1381
1382 if is_object_type {
1383 obj.insert(
1385 "additionalProperties".to_owned(),
1386 serde_json::Value::Bool(false),
1387 );
1388
1389 if let Some(serde_json::Value::Object(props)) = obj.get("properties") {
1391 let all_keys: Vec<serde_json::Value> = props
1392 .keys()
1393 .map(|k| serde_json::Value::String(k.clone()))
1394 .collect();
1395 obj.insert("required".to_owned(), serde_json::Value::Array(all_keys));
1396 }
1397 }
1398
1399 if let Some(props) = obj.get_mut("properties")
1401 && let Some(props_obj) = props.as_object_mut()
1402 {
1403 for prop_schema in props_obj.values_mut() {
1404 fix_schema_for_strict_mode(prop_schema);
1405 }
1406 }
1407
1408 if let Some(items) = obj.get_mut("items") {
1410 fix_schema_for_strict_mode(items);
1411 }
1412
1413 for key in ["anyOf", "oneOf", "allOf"] {
1415 if let Some(arr) = obj.get_mut(key)
1416 && let Some(arr_items) = arr.as_array_mut()
1417 {
1418 for item in arr_items {
1419 fix_schema_for_strict_mode(item);
1420 }
1421 }
1422 }
1423}
1424
1425fn convert_tool(tool: agent_sdk_foundation::llm::Tool) -> ApiTool {
1426 let mut schema = tool.input_schema;
1431 fix_schema_for_strict_mode(&mut schema);
1432
1433 ApiTool {
1434 r#type: "function".to_owned(),
1435 name: tool.name,
1436 description: Some(tool.description),
1437 parameters: Some(schema),
1438 strict: Some(true),
1439 }
1440}
1441
1442fn suggested_filename(media_type: &str) -> String {
1443 match media_type {
1444 "application/pdf" => "attachment.pdf".to_string(),
1445 "image/png" => "image.png".to_string(),
1446 "image/jpeg" => "image.jpg".to_string(),
1447 "image/gif" => "image.gif".to_string(),
1448 "image/webp" => "image.webp".to_string(),
1449 _ => "attachment.bin".to_string(),
1450 }
1451}
1452
1453fn build_content_blocks(output: &[ApiOutputItem]) -> Vec<ContentBlock> {
1454 let mut blocks = Vec::new();
1455
1456 for item in output {
1457 match item {
1458 ApiOutputItem::Message { content, .. } => {
1459 for c in content {
1460 if let ApiOutputContent::Text { text } = c
1461 && !text.is_empty()
1462 {
1463 blocks.push(ContentBlock::Text { text: text.clone() });
1464 }
1465 }
1466 }
1467 ApiOutputItem::FunctionCall {
1468 call_id,
1469 name,
1470 arguments,
1471 ..
1472 } => {
1473 let input =
1474 serde_json::from_str(arguments).unwrap_or_else(|_| serde_json::json!({}));
1475 blocks.push(ContentBlock::ToolUse {
1476 id: call_id.clone(),
1477 name: name.clone(),
1478 input,
1479 thought_signature: None,
1480 });
1481 }
1482 ApiOutputItem::Unknown => {
1483 }
1485 }
1486 }
1487
1488 blocks
1489}
1490
1491fn build_api_reasoning(thinking: Option<&ThinkingConfig>) -> Option<ApiReasoning> {
1492 thinking
1493 .and_then(resolve_reasoning_effort)
1494 .map(|effort| ApiReasoning { effort })
1495}
1496
1497const fn resolve_reasoning_effort(config: &ThinkingConfig) -> Option<ReasoningEffort> {
1498 if let Some(effort) = config.effort {
1499 return Some(map_effort(effort));
1500 }
1501
1502 match &config.mode {
1503 ThinkingMode::Adaptive => None,
1504 ThinkingMode::Enabled { budget_tokens } => Some(map_budget_to_reasoning(*budget_tokens)),
1505 }
1506}
1507
1508const fn map_effort(effort: Effort) -> ReasoningEffort {
1509 match effort {
1510 Effort::Low => ReasoningEffort::Low,
1511 Effort::Medium => ReasoningEffort::Medium,
1512 Effort::High => ReasoningEffort::High,
1513 Effort::Max => ReasoningEffort::XHigh,
1514 }
1515}
1516
1517const fn map_reasoning_effort(effort: ReasoningEffort) -> Effort {
1518 match effort {
1519 ReasoningEffort::Low => Effort::Low,
1520 ReasoningEffort::Medium => Effort::Medium,
1521 ReasoningEffort::High => Effort::High,
1522 ReasoningEffort::XHigh => Effort::Max,
1523 }
1524}
1525
1526const fn map_budget_to_reasoning(budget_tokens: u32) -> ReasoningEffort {
1527 if budget_tokens <= 4_096 {
1528 ReasoningEffort::Low
1529 } else if budget_tokens <= 16_384 {
1530 ReasoningEffort::Medium
1531 } else if budget_tokens <= 32_768 {
1532 ReasoningEffort::High
1533 } else {
1534 ReasoningEffort::XHigh
1535 }
1536}
1537
1538fn codex_url(base_url: &str) -> String {
1539 let normalized = base_url.trim_end_matches('/');
1540 if normalized.ends_with("/codex/responses") {
1541 normalized.to_string()
1542 } else if normalized.ends_with("/codex") {
1543 format!("{normalized}/responses")
1544 } else {
1545 format!("{normalized}/codex/responses")
1546 }
1547}
1548
1549fn codex_websocket_url(base_url: &str) -> Result<url::Url> {
1550 let mut url = url::Url::parse(&codex_url(base_url))
1551 .context("failed to parse OpenAI Codex websocket URL")?;
1552
1553 let scheme = match url.scheme() {
1554 "http" => Some("ws"),
1555 "https" => Some("wss"),
1556 _ => None,
1557 };
1558
1559 if let Some(scheme) = scheme {
1560 let _ = url.set_scheme(scheme);
1561 }
1562
1563 Ok(url)
1564}
1565
1566fn extract_account_id(token: &str) -> Result<String> {
1567 let payload = token
1568 .split('.')
1569 .nth(1)
1570 .ok_or_else(|| anyhow::anyhow!("invalid OpenAI Codex OAuth token"))?;
1571 let decoded = base64::engine::general_purpose::URL_SAFE_NO_PAD
1572 .decode(payload)
1573 .context("failed to decode OpenAI Codex token payload")?;
1574 let payload: serde_json::Value =
1575 serde_json::from_slice(&decoded).context("failed to parse OpenAI Codex token payload")?;
1576 payload
1577 .get(OPENAI_CODEX_JWT_CLAIM_PATH)
1578 .and_then(|value| value.get("chatgpt_account_id"))
1579 .and_then(serde_json::Value::as_str)
1580 .map(ToOwned::to_owned)
1581 .ok_or_else(|| anyhow::anyhow!("chatgpt_account_id missing from OpenAI Codex token"))
1582}
1583
1584fn is_empty(value: &str) -> bool {
1585 value.trim().is_empty()
1586}
1587
1588struct ToolCallAccumulator {
1593 id: String,
1594 name: String,
1595 arguments: String,
1596}
1597
1598fn usage_from_api_usage(usage: &ApiUsage) -> Usage {
1599 Usage {
1600 input_tokens: usage.input_tokens,
1601 output_tokens: usage.output_tokens,
1602 cached_input_tokens: usage
1603 .input_tokens_details
1604 .as_ref()
1605 .map_or(0, |details| details.cached_tokens),
1606 cache_creation_input_tokens: 0,
1607 }
1608}
1609
1610fn emit_accumulated_tool_calls(
1611 tool_calls: &HashMap<String, ToolCallAccumulator>,
1612) -> Vec<StreamDelta> {
1613 let block_index = usize::from(!tool_calls.is_empty());
1614 let mut deltas = Vec::new();
1615 for acc in tool_calls.values() {
1616 deltas.push(StreamDelta::ToolUseStart {
1617 id: acc.id.clone(),
1618 name: acc.name.clone(),
1619 block_index,
1620 thought_signature: None,
1621 });
1622 deltas.push(StreamDelta::ToolInputDelta {
1623 id: acc.id.clone(),
1624 delta: acc.arguments.clone(),
1625 block_index,
1626 });
1627 }
1628 deltas
1629}
1630
1631fn stop_reason_from_stream_state(
1632 tool_calls: &HashMap<String, ToolCallAccumulator>,
1633 status: Option<ApiStatus>,
1634) -> StopReason {
1635 if tool_calls.is_empty() {
1636 match status.unwrap_or(ApiStatus::Completed) {
1637 ApiStatus::Completed => StopReason::EndTurn,
1638 ApiStatus::Incomplete => StopReason::MaxTokens,
1639 ApiStatus::Failed => StopReason::StopSequence,
1640 }
1641 } else {
1642 StopReason::ToolUse
1643 }
1644}
1645
1646fn reset_websocket_connection(session: &mut WebsocketSessionState) {
1647 session.connection = None;
1648 if session.prewarmed {
1649 session.last_request = None;
1650 session.last_response_id = None;
1651 session.last_response_items.clear();
1652 }
1653 session.prewarmed = false;
1654}
1655
1656fn parse_wrapped_websocket_error_event(payload: &str) -> Option<(StatusCode, String)> {
1657 let event: ApiWrappedWebsocketErrorEvent = serde_json::from_str(payload).ok()?;
1658 if event.kind != "error" {
1659 return None;
1660 }
1661
1662 if event.error.as_ref().and_then(|error| error.code.as_deref())
1663 == Some(OPENAI_CODEX_WEBSOCKET_CONNECTION_LIMIT_REACHED_CODE)
1664 {
1665 let message = event
1666 .error
1667 .and_then(|error| error.message)
1668 .unwrap_or_else(|| "Responses websocket connection limit reached".to_string());
1669 return Some((StatusCode::TOO_MANY_REQUESTS, message));
1670 }
1671
1672 let status = StatusCode::from_u16(event.status?).ok()?;
1673 let message = event
1674 .error
1675 .and_then(|error| error.message)
1676 .unwrap_or_else(|| payload.to_string());
1677 if status.is_success() {
1678 None
1679 } else {
1680 Some((status, message))
1681 }
1682}
1683
1684fn output_item_to_input_item(item: ApiOutputItem) -> Option<ApiInputItem> {
1685 match item {
1686 ApiOutputItem::Message { content, .. } => {
1687 let parts: Vec<ApiInputContent> = content
1688 .into_iter()
1689 .filter_map(|content| match content {
1690 ApiOutputContent::Text { text } if !text.is_empty() => {
1691 Some(ApiInputContent::OutputText { text })
1692 }
1693 ApiOutputContent::Unknown | ApiOutputContent::Text { .. } => None,
1694 })
1695 .collect();
1696 if parts.is_empty() {
1697 None
1698 } else {
1699 Some(ApiInputItem::Message(ApiMessage {
1700 role: ApiRole::Assistant,
1701 content: ApiMessageContent::Parts(parts),
1702 }))
1703 }
1704 }
1705 ApiOutputItem::FunctionCall {
1706 call_id,
1707 name,
1708 arguments,
1709 } => Some(ApiInputItem::FunctionCall(ApiFunctionCall::new(
1710 call_id, name, arguments,
1711 ))),
1712 ApiOutputItem::Unknown => None,
1713 }
1714}
1715
1716fn prepare_websocket_request(
1717 request: &ApiStreamingRequest,
1718 session: &WebsocketSessionState,
1719 allow_empty_delta: bool,
1720) -> ApiWebsocketRequest {
1721 let mut websocket_request = ApiWebsocketRequest::from(request);
1722
1723 let Some(last_request) = session.last_request.as_ref() else {
1724 return websocket_request;
1725 };
1726 let Some(last_response_id) = session.last_response_id.as_ref() else {
1727 return websocket_request;
1728 };
1729
1730 let mut previous_without_input = last_request.clone();
1731 previous_without_input.input.clear();
1732 let mut current_without_input = request.clone();
1733 current_without_input.input.clear();
1734 if previous_without_input != current_without_input {
1735 return websocket_request;
1736 }
1737
1738 let mut baseline = last_request.input.clone();
1739 baseline.extend(session.last_response_items.clone());
1740 if request.input.starts_with(&baseline)
1741 && (allow_empty_delta || baseline.len() < request.input.len())
1742 {
1743 websocket_request.previous_response_id = Some(last_response_id.clone());
1744 websocket_request.input = request.input[baseline.len()..].to_vec();
1745 }
1746
1747 websocket_request
1748}
1749
1750#[derive(Serialize)]
1755struct ApiResponsesRequest<'a> {
1756 model: &'a str,
1757 #[serde(skip_serializing_if = "is_empty")]
1758 instructions: &'a str,
1759 input: &'a [ApiInputItem],
1760 #[serde(skip_serializing_if = "Option::is_none")]
1761 tools: Option<&'a [ApiTool]>,
1762 #[serde(skip_serializing_if = "Option::is_none")]
1763 max_output_tokens: Option<u32>,
1764 #[serde(skip_serializing_if = "Option::is_none")]
1765 reasoning: Option<ApiReasoning>,
1766 #[serde(skip_serializing_if = "Option::is_none")]
1767 tool_choice: Option<&'static str>,
1768 #[serde(skip_serializing_if = "Option::is_none")]
1769 parallel_tool_calls: Option<bool>,
1770 store: bool,
1771 #[serde(skip_serializing_if = "Option::is_none")]
1772 text: Option<ApiTextSettings>,
1773 #[serde(skip_serializing_if = "Option::is_none")]
1774 include: Option<&'a [&'static str]>,
1775 #[serde(skip_serializing_if = "Option::is_none")]
1776 prompt_cache_key: Option<&'a str>,
1777}
1778
1779#[derive(Clone, PartialEq, Serialize)]
1780struct ApiStreamingRequest {
1781 model: String,
1782 #[serde(skip_serializing_if = "String::is_empty")]
1783 instructions: String,
1784 input: Vec<ApiInputItem>,
1785 #[serde(skip_serializing_if = "Option::is_none")]
1786 tools: Option<Vec<ApiTool>>,
1787 #[serde(skip_serializing_if = "Option::is_none")]
1788 max_output_tokens: Option<u32>,
1789 #[serde(skip_serializing_if = "Option::is_none")]
1790 reasoning: Option<ApiReasoning>,
1791 #[serde(skip_serializing_if = "Option::is_none")]
1792 tool_choice: Option<String>,
1793 #[serde(skip_serializing_if = "Option::is_none")]
1794 parallel_tool_calls: Option<bool>,
1795 store: bool,
1796 #[serde(skip_serializing_if = "Option::is_none")]
1797 text: Option<ApiTextSettings>,
1798 #[serde(skip_serializing_if = "Option::is_none")]
1799 include: Option<Vec<String>>,
1800 #[serde(skip_serializing_if = "Option::is_none")]
1801 prompt_cache_key: Option<String>,
1802 stream: bool,
1803}
1804
1805#[derive(Clone, Serialize)]
1806struct ApiWebsocketRequest {
1807 #[serde(rename = "type")]
1808 kind: &'static str,
1809 model: String,
1810 #[serde(skip_serializing_if = "String::is_empty")]
1811 instructions: String,
1812 #[serde(skip_serializing_if = "Option::is_none")]
1813 previous_response_id: Option<String>,
1814 input: Vec<ApiInputItem>,
1815 #[serde(skip_serializing_if = "Option::is_none")]
1816 tools: Option<Vec<ApiTool>>,
1817 #[serde(skip_serializing_if = "Option::is_none")]
1818 max_output_tokens: Option<u32>,
1819 #[serde(skip_serializing_if = "Option::is_none")]
1820 reasoning: Option<ApiReasoning>,
1821 #[serde(skip_serializing_if = "Option::is_none")]
1822 tool_choice: Option<String>,
1823 #[serde(skip_serializing_if = "Option::is_none")]
1824 parallel_tool_calls: Option<bool>,
1825 store: bool,
1826 #[serde(skip_serializing_if = "Option::is_none")]
1827 text: Option<ApiTextSettings>,
1828 #[serde(skip_serializing_if = "Option::is_none")]
1829 include: Option<Vec<String>>,
1830 #[serde(skip_serializing_if = "Option::is_none")]
1831 prompt_cache_key: Option<String>,
1832 stream: bool,
1833 #[serde(skip_serializing_if = "Option::is_none")]
1834 generate: Option<bool>,
1835}
1836
1837impl From<&ApiStreamingRequest> for ApiWebsocketRequest {
1838 fn from(request: &ApiStreamingRequest) -> Self {
1839 Self {
1840 kind: "response.create",
1841 model: request.model.clone(),
1842 instructions: request.instructions.clone(),
1843 previous_response_id: None,
1844 input: request.input.clone(),
1845 tools: request.tools.clone(),
1846 max_output_tokens: request.max_output_tokens,
1847 reasoning: request.reasoning.clone(),
1848 tool_choice: request.tool_choice.clone(),
1849 parallel_tool_calls: request.parallel_tool_calls,
1850 store: request.store,
1851 text: request.text,
1852 include: request.include.clone(),
1853 prompt_cache_key: request.prompt_cache_key.clone(),
1854 stream: request.stream,
1855 generate: None,
1856 }
1857 }
1858}
1859
1860#[derive(Clone, Copy, PartialEq, Serialize)]
1861struct ApiTextSettings {
1862 verbosity: &'static str,
1863}
1864
1865#[derive(Clone, PartialEq, Serialize)]
1866struct ApiReasoning {
1867 effort: ReasoningEffort,
1868}
1869
1870#[derive(Clone, PartialEq, Serialize)]
1871#[serde(untagged)]
1872enum ApiInputItem {
1873 Message(ApiMessage),
1874 FunctionCall(ApiFunctionCall),
1875 FunctionCallOutput(ApiFunctionCallOutput),
1876}
1877
1878#[derive(Clone, PartialEq, Serialize)]
1879struct ApiMessage {
1880 role: ApiRole,
1881 content: ApiMessageContent,
1882}
1883
1884#[derive(Clone, Copy, PartialEq, Serialize)]
1885#[serde(rename_all = "lowercase")]
1886enum ApiRole {
1887 User,
1888 Assistant,
1889}
1890
1891#[derive(Clone, PartialEq, Serialize)]
1892#[serde(untagged)]
1893enum ApiMessageContent {
1894 Text(String),
1895 Parts(Vec<ApiInputContent>),
1896}
1897
1898#[derive(Clone, PartialEq, Serialize)]
1899#[serde(tag = "type")]
1900enum ApiInputContent {
1901 #[serde(rename = "input_text")]
1902 InputText { text: String },
1903 #[serde(rename = "output_text")]
1904 OutputText { text: String },
1905 #[serde(rename = "input_image")]
1906 Image { image_url: String },
1907 #[serde(rename = "input_file")]
1908 File { filename: String, file_data: String },
1909}
1910
1911#[derive(Clone, PartialEq, Serialize)]
1912struct ApiFunctionCall {
1913 r#type: &'static str,
1914 call_id: String,
1915 name: String,
1916 arguments: String,
1917}
1918
1919impl ApiFunctionCall {
1920 const fn new(call_id: String, name: String, arguments: String) -> Self {
1921 Self {
1922 r#type: "function_call",
1923 call_id,
1924 name,
1925 arguments,
1926 }
1927 }
1928}
1929
1930#[derive(Clone, PartialEq, Serialize)]
1931struct ApiFunctionCallOutput {
1932 r#type: &'static str,
1933 call_id: String,
1934 output: String,
1935}
1936
1937impl ApiFunctionCallOutput {
1938 const fn new(call_id: String, output: String) -> Self {
1939 Self {
1940 r#type: "function_call_output",
1941 call_id,
1942 output,
1943 }
1944 }
1945}
1946
1947#[derive(Clone, PartialEq, Serialize)]
1948struct ApiTool {
1949 r#type: String,
1950 name: String,
1951 #[serde(skip_serializing_if = "Option::is_none")]
1952 description: Option<String>,
1953 #[serde(skip_serializing_if = "Option::is_none")]
1954 parameters: Option<serde_json::Value>,
1955 #[serde(skip_serializing_if = "Option::is_none")]
1956 strict: Option<bool>,
1957}
1958
1959#[derive(Deserialize)]
1964struct ApiResponse {
1965 id: String,
1966 model: String,
1967 output: Vec<ApiOutputItem>,
1968 #[serde(default)]
1969 status: Option<ApiStatus>,
1970 #[serde(default)]
1971 usage: Option<ApiUsage>,
1972}
1973
1974#[derive(Clone, Copy, Deserialize)]
1975#[serde(rename_all = "snake_case")]
1976enum ApiStatus {
1977 Completed,
1978 Incomplete,
1979 Failed,
1980}
1981
1982#[derive(Deserialize)]
1983struct ApiUsage {
1984 input_tokens: u32,
1985 output_tokens: u32,
1986 #[serde(default)]
1987 input_tokens_details: Option<ApiInputTokensDetails>,
1988}
1989
1990#[derive(Deserialize)]
1991struct ApiInputTokensDetails {
1992 #[serde(default)]
1993 cached_tokens: u32,
1994}
1995
1996#[derive(Deserialize)]
1997#[serde(tag = "type")]
1998enum ApiOutputItem {
1999 #[serde(rename = "message")]
2000 Message {
2001 #[serde(rename = "role")]
2002 _role: String,
2003 content: Vec<ApiOutputContent>,
2004 },
2005 #[serde(rename = "function_call")]
2006 FunctionCall {
2007 call_id: String,
2008 name: String,
2009 arguments: String,
2010 },
2011 #[serde(other)]
2012 Unknown,
2013}
2014
2015#[derive(Deserialize)]
2016#[serde(tag = "type")]
2017enum ApiOutputContent {
2018 #[serde(rename = "output_text")]
2019 Text { text: String },
2020 #[serde(other)]
2021 Unknown,
2022}
2023
2024#[derive(Deserialize)]
2029struct ApiStreamEvent {
2030 r#type: String,
2031 #[serde(default)]
2032 delta: Option<String>,
2033 #[serde(default)]
2034 call_id: Option<String>,
2035 #[serde(default)]
2036 name: Option<String>,
2037 #[serde(default)]
2038 item: Option<serde_json::Value>,
2039 #[serde(default)]
2040 response: Option<ApiStreamResponse>,
2041}
2042
2043#[derive(Deserialize)]
2044struct ApiStreamResponse {
2045 #[serde(default)]
2046 id: Option<String>,
2047 #[serde(default)]
2048 usage: Option<ApiUsage>,
2049 #[serde(default)]
2050 error: Option<ApiErrorBody>,
2051}
2052
2053#[derive(Deserialize)]
2054struct ApiErrorBody {
2055 #[serde(default)]
2056 message: Option<String>,
2057}
2058
2059#[derive(Deserialize)]
2060struct ApiWrappedWebsocketErrorBody {
2061 #[serde(default)]
2062 code: Option<String>,
2063 #[serde(default)]
2064 message: Option<String>,
2065}
2066
2067#[derive(Deserialize)]
2068struct ApiWrappedWebsocketErrorEvent {
2069 #[serde(rename = "type")]
2070 kind: String,
2071 #[serde(alias = "status_code")]
2072 status: Option<u16>,
2073 #[serde(default)]
2074 error: Option<ApiWrappedWebsocketErrorBody>,
2075}
2076
2077#[cfg(test)]
2082mod tests {
2083 use super::*;
2084
2085 #[test]
2086 fn test_model_constant() {
2087 assert_eq!(MODEL_GPT54, "gpt-5.4");
2088 assert_eq!(MODEL_GPT53_CODEX, "gpt-5.3-codex");
2089 assert_eq!(MODEL_GPT52_CODEX, "gpt-5.2-codex");
2090 }
2091
2092 #[test]
2093 fn test_codex_factory() {
2094 let provider = OpenAICodexResponsesProvider::codex("test-key".to_string());
2095 assert_eq!(provider.model(), MODEL_GPT53_CODEX);
2096 assert_eq!(provider.provider(), "openai-codex");
2097 }
2098
2099 #[test]
2100 fn test_gpt54_factory() {
2101 let provider = OpenAICodexResponsesProvider::gpt54("test-key".to_string());
2102 assert_eq!(provider.model(), MODEL_GPT54);
2103 assert_eq!(provider.provider(), "openai-codex");
2104 }
2105
2106 #[test]
2107 fn test_gpt53_codex_factory() {
2108 let provider = OpenAICodexResponsesProvider::gpt53_codex("test-key".to_string());
2109 assert_eq!(provider.model(), MODEL_GPT53_CODEX);
2110 assert_eq!(provider.provider(), "openai-codex");
2111 }
2112
2113 #[test]
2114 fn test_reasoning_effort_serialization() {
2115 let low = serde_json::to_string(&ReasoningEffort::Low).unwrap();
2116 assert_eq!(low, "\"low\"");
2117
2118 let xhigh = serde_json::to_string(&ReasoningEffort::XHigh).unwrap();
2119 assert_eq!(xhigh, "\"xhigh\"");
2120 }
2121
2122 #[test]
2123 fn test_with_reasoning_effort() {
2124 let provider = OpenAICodexResponsesProvider::codex("test-key".to_string())
2125 .with_reasoning_effort(ReasoningEffort::High);
2126 let thinking = provider.thinking.as_ref().unwrap();
2127 assert!(matches!(thinking.effort, Some(Effort::High)));
2128 }
2129
2130 #[test]
2131 fn test_build_api_reasoning_uses_explicit_effort() {
2132 let reasoning =
2133 build_api_reasoning(Some(&ThinkingConfig::adaptive_with_effort(Effort::Low))).unwrap();
2134 assert!(matches!(reasoning.effort, ReasoningEffort::Low));
2135 }
2136
2137 #[test]
2138 fn test_build_api_reasoning_omits_adaptive_without_effort() {
2139 assert!(build_api_reasoning(Some(&ThinkingConfig::adaptive())).is_none());
2140 }
2141
2142 #[test]
2143 fn test_openai_responses_rejects_adaptive_thinking() {
2144 let provider = OpenAICodexResponsesProvider::codex("test-key".to_string());
2145 let error = provider
2146 .validate_thinking_config(Some(&ThinkingConfig::adaptive()))
2147 .unwrap_err();
2148 assert!(
2149 error
2150 .to_string()
2151 .contains("adaptive thinking is not supported")
2152 );
2153 }
2154
2155 #[test]
2156 fn test_api_tool_serialization() {
2157 let tool = ApiTool {
2158 r#type: "function".to_owned(),
2159 name: "get_weather".to_owned(),
2160 description: Some("Get weather".to_owned()),
2161 parameters: Some(serde_json::json!({"type": "object"})),
2162 strict: Some(true),
2163 };
2164
2165 let json = serde_json::to_string(&tool).unwrap();
2166 assert!(json.contains("\"type\":\"function\""));
2167 assert!(json.contains("\"name\":\"get_weather\""));
2168 assert!(json.contains("\"strict\":true"));
2169 }
2170
2171 fn test_token() -> String {
2172 let header = base64::engine::general_purpose::URL_SAFE_NO_PAD.encode(r#"{"alg":"none"}"#);
2173 let payload = base64::engine::general_purpose::URL_SAFE_NO_PAD.encode(format!(
2174 r#"{{"{OPENAI_CODEX_JWT_CLAIM_PATH}":{{"chatgpt_account_id":"acct_123"}}}}"#
2175 ));
2176 format!("{header}.{payload}.sig")
2177 }
2178
2179 #[test]
2180 fn test_build_headers_match_codex_style_defaults() -> anyhow::Result<()> {
2181 let provider = OpenAICodexResponsesProvider::codex(test_token());
2182
2183 let headers = provider.build_headers(true, Some("session-123"), None)?;
2184 assert_eq!(headers.get("originator").unwrap(), OPENAI_CODEX_ORIGINATOR);
2185 assert_eq!(headers.get("chatgpt-account-id").unwrap(), "acct_123");
2186 assert_eq!(headers.get("session_id").unwrap(), "session-123");
2187 assert_eq!(headers.get("x-client-request-id").unwrap(), "session-123");
2188 assert_eq!(
2189 headers.get("OpenAI-Beta").unwrap(),
2190 OPENAI_CODEX_RESPONSES_BETA_HEADER
2191 );
2192
2193 Ok(())
2194 }
2195
2196 #[test]
2197 fn test_build_websocket_headers_match_codex_style_defaults() -> anyhow::Result<()> {
2198 let provider = OpenAICodexResponsesProvider::codex(test_token());
2199
2200 let headers = provider.build_websocket_headers(Some("session-123"), Some("turn-1"))?;
2201 assert_eq!(headers.get("originator").unwrap(), OPENAI_CODEX_ORIGINATOR);
2202 assert_eq!(headers.get("chatgpt-account-id").unwrap(), "acct_123");
2203 assert_eq!(headers.get("session_id").unwrap(), "session-123");
2204 assert_eq!(headers.get("x-client-request-id").unwrap(), "session-123");
2205 assert_eq!(
2206 headers.get(OPENAI_CODEX_TURN_STATE_HEADER).unwrap(),
2207 "turn-1"
2208 );
2209 assert_eq!(
2210 headers.get("OpenAI-Beta").unwrap(),
2211 OPENAI_CODEX_RESPONSES_WEBSOCKETS_BETA_HEADER,
2212 );
2213
2214 Ok(())
2215 }
2216
2217 #[test]
2218 fn test_build_headers_uses_configured_account_id_without_jwt_decode() -> anyhow::Result<()> {
2219 let provider = OpenAICodexResponsesProvider::codex("not-a-jwt".to_string())
2220 .with_account_id("acct_stored");
2221
2222 let headers = provider.build_headers(true, Some("session-123"), Some("turn-1"))?;
2223 assert_eq!(headers.get("chatgpt-account-id").unwrap(), "acct_stored");
2224 assert_eq!(
2225 headers.get(OPENAI_CODEX_TURN_STATE_HEADER).unwrap(),
2226 "turn-1"
2227 );
2228
2229 Ok(())
2230 }
2231
2232 #[test]
2233 fn test_request_serialization_includes_store_false() {
2234 let request = ApiStreamingRequest {
2235 model: MODEL_GPT53_CODEX.to_string(),
2236 instructions: "system".to_string(),
2237 input: Vec::new(),
2238 tools: None,
2239 max_output_tokens: None,
2240 reasoning: None,
2241 tool_choice: Some("auto".to_string()),
2242 parallel_tool_calls: Some(true),
2243 store: false,
2244 text: Some(ApiTextSettings {
2245 verbosity: "medium",
2246 }),
2247 include: Some(vec!["reasoning.encrypted_content".to_string()]),
2248 prompt_cache_key: Some("session-123".to_string()),
2249 stream: true,
2250 };
2251
2252 let json = serde_json::to_string(&request).unwrap();
2253 assert!(json.contains("\"store\":false"));
2254 assert!(json.contains("\"stream\":true"));
2255 }
2256
2257 #[test]
2258 fn test_prepare_websocket_request_uses_previous_response_id_for_incremental_input() {
2259 let request = ApiStreamingRequest {
2260 model: MODEL_GPT53_CODEX.to_string(),
2261 instructions: "system".to_string(),
2262 input: vec![
2263 ApiInputItem::Message(ApiMessage {
2264 role: ApiRole::User,
2265 content: ApiMessageContent::Text("first".to_string()),
2266 }),
2267 ApiInputItem::Message(ApiMessage {
2268 role: ApiRole::Assistant,
2269 content: ApiMessageContent::Parts(vec![ApiInputContent::OutputText {
2270 text: "answer".to_string(),
2271 }]),
2272 }),
2273 ApiInputItem::Message(ApiMessage {
2274 role: ApiRole::User,
2275 content: ApiMessageContent::Text("follow up".to_string()),
2276 }),
2277 ],
2278 tools: None,
2279 max_output_tokens: None,
2280 reasoning: None,
2281 tool_choice: Some("auto".to_string()),
2282 parallel_tool_calls: None,
2283 store: false,
2284 text: Some(ApiTextSettings {
2285 verbosity: "medium",
2286 }),
2287 include: Some(vec!["reasoning.encrypted_content".to_string()]),
2288 prompt_cache_key: Some("thread-1".to_string()),
2289 stream: true,
2290 };
2291 let previous_request = ApiStreamingRequest {
2292 input: vec![ApiInputItem::Message(ApiMessage {
2293 role: ApiRole::User,
2294 content: ApiMessageContent::Text("first".to_string()),
2295 })],
2296 ..request.clone()
2297 };
2298 let session = WebsocketSessionState {
2299 connection: None,
2300 last_request: Some(previous_request),
2301 last_response_id: Some("resp_prev".to_string()),
2302 last_response_items: vec![ApiInputItem::Message(ApiMessage {
2303 role: ApiRole::Assistant,
2304 content: ApiMessageContent::Parts(vec![ApiInputContent::OutputText {
2305 text: "answer".to_string(),
2306 }]),
2307 })],
2308 turn_state: None,
2309 prewarmed: false,
2310 websocket_disabled: false,
2311 };
2312
2313 let websocket_request = prepare_websocket_request(&request, &session, false);
2314 assert_eq!(
2315 websocket_request.previous_response_id.as_deref(),
2316 Some("resp_prev")
2317 );
2318 assert_eq!(websocket_request.input.len(), 1);
2319 match &websocket_request.input[0] {
2320 ApiInputItem::Message(ApiMessage {
2321 role: ApiRole::User,
2322 content: ApiMessageContent::Text(text),
2323 }) => assert_eq!(text, "follow up"),
2324 _ => panic!("expected incremental follow-up user message"),
2325 }
2326 }
2327
2328 #[test]
2329 fn test_parse_wrapped_websocket_error_event_maps_http_status() {
2330 let payload = r#"{"type":"error","status":401,"error":{"message":"unauthorized"}}"#;
2331 let parsed = parse_wrapped_websocket_error_event(payload);
2332 assert_eq!(
2333 parsed,
2334 Some((StatusCode::UNAUTHORIZED, "unauthorized".to_string())),
2335 );
2336 }
2337
2338 #[test]
2339 fn test_parse_wrapped_websocket_error_event_maps_connection_limit() {
2340 let payload = format!(
2341 r#"{{"type":"error","status":429,"error":{{"code":"{OPENAI_CODEX_WEBSOCKET_CONNECTION_LIMIT_REACHED_CODE}","message":"limit"}}}}"#,
2342 );
2343 let parsed = parse_wrapped_websocket_error_event(&payload);
2344 assert_eq!(
2345 parsed,
2346 Some((StatusCode::TOO_MANY_REQUESTS, "limit".to_string())),
2347 );
2348 }
2349
2350 #[test]
2351 fn test_prepare_websocket_request_allows_empty_delta_after_prewarm() {
2352 let request = ApiStreamingRequest {
2353 model: MODEL_GPT53_CODEX.to_string(),
2354 instructions: "system".to_string(),
2355 input: vec![ApiInputItem::Message(ApiMessage {
2356 role: ApiRole::User,
2357 content: ApiMessageContent::Text("first".to_string()),
2358 })],
2359 tools: None,
2360 max_output_tokens: None,
2361 reasoning: None,
2362 tool_choice: Some("auto".to_string()),
2363 parallel_tool_calls: None,
2364 store: false,
2365 text: Some(ApiTextSettings {
2366 verbosity: "medium",
2367 }),
2368 include: Some(vec!["reasoning.encrypted_content".to_string()]),
2369 prompt_cache_key: Some("thread-1".to_string()),
2370 stream: true,
2371 };
2372 let session = WebsocketSessionState {
2373 connection: None,
2374 last_request: Some(request.clone()),
2375 last_response_id: Some("resp_prewarm".to_string()),
2376 last_response_items: Vec::new(),
2377 turn_state: None,
2378 prewarmed: true,
2379 websocket_disabled: false,
2380 };
2381
2382 let websocket_request = prepare_websocket_request(&request, &session, true);
2383 assert_eq!(
2384 websocket_request.previous_response_id.as_deref(),
2385 Some("resp_prewarm")
2386 );
2387 assert!(websocket_request.input.is_empty());
2388 }
2389
2390 #[test]
2391 fn test_api_response_deserialization() {
2392 let json = r#"{
2393 "id": "resp_123",
2394 "model": "gpt-5.2-codex",
2395 "output": [
2396 {
2397 "type": "message",
2398 "role": "assistant",
2399 "content": [
2400 {"type": "output_text", "text": "Hello!"}
2401 ]
2402 }
2403 ],
2404 "status": "completed",
2405 "usage": {
2406 "input_tokens": 100,
2407 "output_tokens": 50
2408 }
2409 }"#;
2410
2411 let response: ApiResponse = serde_json::from_str(json).unwrap();
2412 assert_eq!(response.id, "resp_123");
2413 assert_eq!(response.model, "gpt-5.2-codex");
2414 assert_eq!(response.output.len(), 1);
2415 }
2416
2417 #[test]
2418 fn test_api_response_with_function_call() {
2419 let json = r#"{
2420 "id": "resp_456",
2421 "model": "gpt-5.2-codex",
2422 "output": [
2423 {
2424 "type": "function_call",
2425 "call_id": "call_abc",
2426 "name": "read_file",
2427 "arguments": "{\"path\": \"test.txt\"}"
2428 }
2429 ],
2430 "status": "completed"
2431 }"#;
2432
2433 let response: ApiResponse = serde_json::from_str(json).unwrap();
2434 assert_eq!(response.output.len(), 1);
2435
2436 match &response.output[0] {
2437 ApiOutputItem::FunctionCall {
2438 call_id,
2439 name,
2440 arguments,
2441 } => {
2442 assert_eq!(call_id, "call_abc");
2443 assert_eq!(name, "read_file");
2444 assert!(arguments.contains("test.txt"));
2445 }
2446 _ => panic!("Expected FunctionCall"),
2447 }
2448 }
2449
2450 #[test]
2451 fn test_build_api_input_uses_responses_text_types_by_role() {
2452 let request = ChatRequest {
2453 system: "system".to_string(),
2454 messages: vec![
2455 agent_sdk_foundation::llm::Message::user_with_content(vec![ContentBlock::Text {
2456 text: "question".to_string(),
2457 }]),
2458 agent_sdk_foundation::llm::Message {
2459 role: agent_sdk_foundation::llm::Role::Assistant,
2460 content: Content::Blocks(vec![ContentBlock::Text {
2461 text: "answer".to_string(),
2462 }]),
2463 },
2464 ],
2465 tools: None,
2466 max_tokens: 512,
2467 max_tokens_explicit: false,
2468 session_id: None,
2469 cached_content: None,
2470 thinking: None,
2471 tool_choice: None,
2472 response_format: None,
2473 };
2474
2475 let input = build_api_input(&request);
2476 assert_eq!(input.len(), 2);
2477
2478 match &input[0] {
2479 ApiInputItem::Message(ApiMessage {
2480 role: ApiRole::User,
2481 content: ApiMessageContent::Parts(parts),
2482 }) => assert!(matches!(
2483 parts.as_slice(),
2484 [ApiInputContent::InputText { text }] if text == "question"
2485 )),
2486 _ => panic!("expected user message with input_text content"),
2487 }
2488
2489 match &input[1] {
2490 ApiInputItem::Message(ApiMessage {
2491 role: ApiRole::Assistant,
2492 content: ApiMessageContent::Parts(parts),
2493 }) => assert!(matches!(
2494 parts.as_slice(),
2495 [ApiInputContent::OutputText { text }] if text == "answer"
2496 )),
2497 _ => panic!("expected assistant message with output_text content"),
2498 }
2499 }
2500
2501 #[test]
2502 fn test_api_input_content_serialization_uses_current_responses_tags() {
2503 let json = serde_json::to_string(&ApiMessageContent::Parts(vec![
2504 ApiInputContent::InputText {
2505 text: "prompt".to_string(),
2506 },
2507 ApiInputContent::OutputText {
2508 text: "reply".to_string(),
2509 },
2510 ApiInputContent::Image {
2511 image_url: "data:image/png;base64,abc".to_string(),
2512 },
2513 ApiInputContent::File {
2514 filename: "notes.txt".to_string(),
2515 file_data: "data:text/plain;base64,abc".to_string(),
2516 },
2517 ]))
2518 .unwrap();
2519
2520 assert!(json.contains("\"type\":\"input_text\""));
2521 assert!(json.contains("\"type\":\"output_text\""));
2522 assert!(json.contains("\"type\":\"input_image\""));
2523 assert!(json.contains("\"type\":\"input_file\""));
2524 }
2525
2526 #[test]
2527 fn test_build_content_blocks_text() {
2528 let output = vec![ApiOutputItem::Message {
2529 _role: "assistant".to_owned(),
2530 content: vec![ApiOutputContent::Text {
2531 text: "Hello!".to_owned(),
2532 }],
2533 }];
2534
2535 let blocks = build_content_blocks(&output);
2536 assert_eq!(blocks.len(), 1);
2537 assert!(matches!(&blocks[0], ContentBlock::Text { text } if text == "Hello!"));
2538 }
2539
2540 #[test]
2541 fn test_build_content_blocks_function_call() {
2542 let output = vec![ApiOutputItem::FunctionCall {
2543 call_id: "call_123".to_owned(),
2544 name: "test_tool".to_owned(),
2545 arguments: r#"{"key": "value"}"#.to_owned(),
2546 }];
2547
2548 let blocks = build_content_blocks(&output);
2549 assert_eq!(blocks.len(), 1);
2550 assert!(
2551 matches!(&blocks[0], ContentBlock::ToolUse { id, name, .. } if id == "call_123" && name == "test_tool")
2552 );
2553 }
2554}