1use super::{
9 CompletionRequest, CompletionResponse, ContentPart, FinishReason, Message, ModelInfo, Provider,
10 Role, StreamChunk, ToolDefinition, Usage,
11};
12use anyhow::{Context, Result};
13use async_trait::async_trait;
14use base64::{Engine as _, engine::general_purpose::URL_SAFE_NO_PAD};
15use futures::stream::BoxStream;
16use futures::{SinkExt, StreamExt};
17use reqwest::{Client, StatusCode};
18use serde::{Deserialize, Serialize};
19use serde_json::{Value, json};
20use sha2::{Digest, Sha256};
21use std::collections::HashMap;
22use std::io::ErrorKind;
23use std::sync::Arc;
24use tokio::io::{AsyncRead, AsyncWrite};
25use tokio::net::TcpStream;
26use tokio::sync::RwLock;
27use tokio_tungstenite::{
28 MaybeTlsStream, WebSocketStream, connect_async,
29 tungstenite::{
30 Error as WsError, Message as WsMessage,
31 client::IntoClientRequest,
32 http::{HeaderValue, Request},
33 },
34};
35
36const OPENAI_API_URL: &str = "https://api.openai.com/v1";
37const CHATGPT_CODEX_API_URL: &str = "https://chatgpt.com/backend-api/codex";
38const OPENAI_RESPONSES_WS_URL: &str = "wss://api.openai.com/v1/responses";
39const CHATGPT_CODEX_RESPONSES_WS_URL: &str = "wss://chatgpt.com/backend-api/codex/responses";
40const AUTH_ISSUER: &str = "https://auth.openai.com";
41const AUTHORIZE_URL: &str = "https://auth.openai.com/oauth/authorize";
42const TOKEN_URL: &str = "https://auth.openai.com/oauth/token";
43const CLIENT_ID: &str = "app_EMoamEEZ73f0CkXaXp7hrann";
44const REDIRECT_URI: &str = "http://localhost:1455/auth/callback";
45const SCOPE: &str = "openid profile email offline_access";
46const THINKING_LEVEL_ENV: &str = "CODETETHER_OPENAI_CODEX_THINKING_LEVEL";
47const REASONING_EFFORT_ENV: &str = "CODETETHER_OPENAI_CODEX_REASONING_EFFORT";
48const DEFAULT_RESPONSES_INSTRUCTIONS: &str = "You are CodeTether Agent running on OpenAI Codex. \
49Resolve software tasks directly: inspect the workspace, make focused changes, validate with \
50available tools, and report concise results. When model availability or external APIs are involved, \
51verify live behavior before treating it as supported.";
52
53#[derive(Debug, Clone, Copy, PartialEq, Eq)]
54enum ThinkingLevel {
55 NoReasoning,
56 Low,
57 Medium,
58 High,
59 XHigh,
60}
61
62#[derive(Debug, Clone, Copy, PartialEq, Eq)]
63enum ResponsesWsBackend {
64 OpenAi,
65 ChatGptCodex,
66}
67
68#[derive(Debug, Clone, Copy, PartialEq, Eq)]
69enum CodexServiceTier {
70 Priority,
71}
72
73impl ThinkingLevel {
74 fn parse(raw: &str) -> Option<Self> {
75 match raw.trim().to_ascii_lowercase().as_str() {
76 "none" => Some(Self::NoReasoning),
77 "low" => Some(Self::Low),
78 "medium" => Some(Self::Medium),
79 "high" => Some(Self::High),
80 "xhigh" => Some(Self::XHigh),
81 _ => None,
82 }
83 }
84
85 fn as_str(self) -> &'static str {
86 match self {
87 Self::NoReasoning => "none",
88 Self::Low => "low",
89 Self::Medium => "medium",
90 Self::High => "high",
91 Self::XHigh => "xhigh",
92 }
93 }
94}
95
96impl CodexServiceTier {
97 fn as_str(self) -> &'static str {
98 match self {
99 Self::Priority => "priority",
100 }
101 }
102}
103
104#[allow(dead_code)]
106struct CachedTokens {
107 access_token: String,
108 #[allow(dead_code)]
109 refresh_token: String,
110 expires_at: std::time::Instant,
111}
112
113#[derive(Debug, Clone, Serialize, Deserialize)]
115pub struct OAuthCredentials {
116 #[serde(default)]
117 pub id_token: Option<String>,
118 #[serde(default)]
119 pub chatgpt_account_id: Option<String>,
120 pub access_token: String,
121 pub refresh_token: String,
122 pub expires_at: u64, }
124
125struct PkcePair {
127 verifier: String,
128 challenge: String,
129}
130
131#[derive(Debug, Default)]
132struct ResponsesToolState {
133 call_id: String,
134 name: Option<String>,
135 started: bool,
136 finished: bool,
137 emitted_arguments: String,
138}
139
140#[derive(Debug, Default)]
141struct ResponsesSseParser {
142 line_buffer: String,
143 event_data_lines: Vec<String>,
144 tools: HashMap<String, ResponsesToolState>,
145}
146
147pub struct OpenAiRealtimeConnection<S = MaybeTlsStream<TcpStream>> {
148 stream: WebSocketStream<S>,
149}
150
151impl<S> std::fmt::Debug for OpenAiRealtimeConnection<S> {
152 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
153 f.debug_struct("OpenAiRealtimeConnection").finish()
154 }
155}
156
157impl<S> OpenAiRealtimeConnection<S>
158where
159 S: AsyncRead + AsyncWrite + Unpin,
160{
161 fn new(stream: WebSocketStream<S>) -> Self {
162 Self { stream }
163 }
164
165 pub async fn send_event(&mut self, event: &Value) -> Result<()> {
166 let payload = serde_json::to_string(event).context("Failed to serialize Realtime event")?;
167 self.stream
168 .send(WsMessage::Text(payload.into()))
169 .await
170 .context("Failed to send Realtime event")?;
171 Ok(())
172 }
173
174 pub async fn recv_event(&mut self) -> Result<Option<Value>> {
175 while let Some(message) = self.stream.next().await {
176 let message = message.context("Realtime WebSocket receive failed")?;
177 match message {
178 WsMessage::Text(text) => {
179 let event = serde_json::from_str(&text)
180 .context("Failed to parse Realtime text event")?;
181 return Ok(Some(event));
182 }
183 WsMessage::Binary(bytes) => {
184 let text = String::from_utf8(bytes.to_vec())
185 .context("Realtime binary event was not valid UTF-8")?;
186 let event = serde_json::from_str(&text)
187 .context("Failed to parse Realtime binary event")?;
188 return Ok(Some(event));
189 }
190 WsMessage::Ping(payload) => {
191 self.stream
192 .send(WsMessage::Pong(payload))
193 .await
194 .context("Failed to respond to Realtime ping")?;
195 }
196 WsMessage::Pong(_) => {}
197 WsMessage::Frame(_) => {}
198 WsMessage::Close(_) => return Ok(None),
199 }
200 }
201
202 Ok(None)
203 }
204
205 pub async fn close(&mut self) -> Result<()> {
206 match self.stream.send(WsMessage::Close(None)).await {
207 Ok(()) => {}
208 Err(WsError::ConnectionClosed) | Err(WsError::AlreadyClosed) => {}
209 Err(WsError::Io(err))
210 if matches!(
211 err.kind(),
212 ErrorKind::BrokenPipe
213 | ErrorKind::ConnectionReset
214 | ErrorKind::ConnectionAborted
215 | ErrorKind::NotConnected
216 ) => {}
217 Err(err) => return Err(err).context("Failed to close Realtime WebSocket"),
218 }
219 Ok(())
220 }
221}
222
223pub struct OpenAiCodexProvider {
224 client: Client,
225 cached_tokens: Arc<RwLock<Option<CachedTokens>>>,
226 static_api_key: Option<String>,
227 chatgpt_account_id: Option<String>,
228 stored_credentials: Option<OAuthCredentials>,
230}
231
232impl std::fmt::Debug for OpenAiCodexProvider {
233 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
234 f.debug_struct("OpenAiCodexProvider")
235 .field("has_api_key", &self.static_api_key.is_some())
236 .field("has_chatgpt_account_id", &self.chatgpt_account_id.is_some())
237 .field("has_credentials", &self.stored_credentials.is_some())
238 .finish()
239 }
240}
241
242impl Default for OpenAiCodexProvider {
243 fn default() -> Self {
244 Self::new()
245 }
246}
247
248impl OpenAiCodexProvider {
249 fn chatgpt_supported_models() -> &'static [&'static str] {
250 &[
251 "gpt-5",
252 "gpt-5-mini",
253 "gpt-5.1-codex",
254 "gpt-5.2",
255 "gpt-5.3-codex",
256 "gpt-5.4",
257 "gpt-5.5",
258 "gpt-5.5-fast",
259 "o3",
260 "o4-mini",
261 ]
262 }
263
264 fn model_is_supported_by_backend(&self, model: &str) -> bool {
265 if !self.using_chatgpt_backend() {
266 return true;
267 }
268 Self::chatgpt_supported_models().contains(&model)
269 }
270
271 fn validate_model_for_backend(&self, model: &str) -> Result<()> {
272 let (resolved_model, _, _) =
273 Self::resolve_model_and_reasoning_effort_and_service_tier(model);
274 if self.model_is_supported_by_backend(model)
275 || self.model_is_supported_by_backend(&resolved_model)
276 {
277 return Ok(());
278 }
279
280 if self.using_chatgpt_backend() {
281 anyhow::bail!(
282 "Model '{}' is not supported when using Codex with a ChatGPT account. Supported models: {}",
283 model,
284 Self::chatgpt_supported_models().join(", ")
285 );
286 }
287
288 Ok(())
289 }
290
291 pub fn from_api_key(api_key: String) -> Self {
292 Self {
293 client: crate::provider::shared_http::shared_client().clone(),
294 cached_tokens: Arc::new(RwLock::new(None)),
295 static_api_key: Some(api_key),
296 chatgpt_account_id: None,
297 stored_credentials: None,
298 }
299 }
300
301 pub fn from_credentials(credentials: OAuthCredentials) -> Self {
303 let chatgpt_account_id = credentials
304 .chatgpt_account_id
305 .clone()
306 .or_else(|| {
307 credentials
308 .id_token
309 .as_deref()
310 .and_then(Self::extract_chatgpt_account_id_from_jwt)
311 })
312 .or_else(|| Self::extract_chatgpt_account_id_from_jwt(&credentials.access_token));
313
314 Self {
315 client: crate::provider::shared_http::shared_client().clone(),
316 cached_tokens: Arc::new(RwLock::new(None)),
317 static_api_key: None,
318 chatgpt_account_id,
319 stored_credentials: Some(credentials),
320 }
321 }
322
323 #[allow(dead_code)]
325 pub fn new() -> Self {
326 Self {
327 client: crate::provider::shared_http::shared_client().clone(),
328 cached_tokens: Arc::new(RwLock::new(None)),
329 static_api_key: None,
330 chatgpt_account_id: None,
331 stored_credentials: None,
332 }
333 }
334
335 fn generate_pkce() -> PkcePair {
337 let random_bytes: [u8; 32] = {
338 let timestamp = std::time::SystemTime::now()
339 .duration_since(std::time::UNIX_EPOCH)
340 .map(|d| d.as_nanos())
341 .unwrap_or(0);
342
343 let mut bytes = [0u8; 32];
344 let ts_bytes = timestamp.to_le_bytes();
345 let tid = std::thread::current().id();
346 let tid_repr = format!("{:?}", tid);
347 let tid_hash = Sha256::digest(tid_repr.as_bytes());
348
349 bytes[0..8].copy_from_slice(&ts_bytes[0..8]);
350 bytes[8..24].copy_from_slice(&tid_hash[0..16]);
351 bytes[24..].copy_from_slice(&Sha256::digest(ts_bytes)[0..8]);
352 bytes
353 };
354 let verifier = URL_SAFE_NO_PAD.encode(random_bytes);
355
356 let mut hasher = Sha256::new();
357 hasher.update(verifier.as_bytes());
358 let challenge_bytes = hasher.finalize();
359 let challenge = URL_SAFE_NO_PAD.encode(challenge_bytes);
360
361 PkcePair {
362 verifier,
363 challenge,
364 }
365 }
366
367 fn generate_state() -> String {
369 let timestamp = std::time::SystemTime::now()
370 .duration_since(std::time::UNIX_EPOCH)
371 .map(|d| d.as_nanos())
372 .unwrap_or(0);
373 let random: [u8; 8] = {
374 let ptr = Box::into_raw(Box::new(timestamp)) as usize;
375 let bytes = ptr.to_le_bytes();
376 let mut arr = [0u8; 8];
377 arr.copy_from_slice(&bytes);
378 arr
379 };
380 format!("{:016x}{:016x}", timestamp, u64::from_le_bytes(random))
381 }
382
383 #[allow(dead_code)]
385 pub fn get_authorization_url() -> (String, String, String) {
386 let pkce = Self::generate_pkce();
387 let state = Self::generate_state();
388
389 let url = format!(
390 "{}?response_type=code&client_id={}&redirect_uri={}&scope={}&code_challenge={}&code_challenge_method=S256&state={}&id_token_add_organizations=true&codex_cli_simplified_flow=true&originator=codex_cli_rs",
391 AUTHORIZE_URL,
392 CLIENT_ID,
393 urlencoding::encode(REDIRECT_URI),
394 urlencoding::encode(SCOPE),
395 pkce.challenge,
396 state
397 );
398
399 (url, pkce.verifier, state)
400 }
401
402 pub fn oauth_issuer() -> &'static str {
403 AUTH_ISSUER
404 }
405
406 pub fn oauth_client_id() -> &'static str {
407 CLIENT_ID
408 }
409
410 fn responses_ws_url_with_base(base_url: &str) -> String {
411 base_url.to_string()
412 }
413
414 pub fn responses_ws_url() -> String {
415 Self::responses_ws_url_with_base(OPENAI_RESPONSES_WS_URL)
416 }
417
418 fn build_responses_ws_request_with_base_url_and_account_id(
419 base_url: &str,
420 token: &str,
421 chatgpt_account_id: Option<&str>,
422 ) -> Result<Request<()>> {
423 if token.trim().is_empty() {
424 anyhow::bail!("Responses WebSocket token cannot be empty");
425 }
426
427 let url = Self::responses_ws_url_with_base(base_url);
428 let mut request = url
429 .into_client_request()
430 .context("Failed to build Responses WebSocket request")?;
431 request.headers_mut().insert(
432 "Authorization",
433 HeaderValue::from_str(&format!("Bearer {token}"))
434 .context("Failed to build Responses Authorization header")?,
435 );
436 request.headers_mut().insert(
437 "User-Agent",
438 HeaderValue::from_static("codetether-responses-ws/1.0"),
439 );
440 if let Some(account_id) = chatgpt_account_id.filter(|id| !id.trim().is_empty()) {
441 request.headers_mut().insert(
442 "ChatGPT-Account-ID",
443 HeaderValue::from_str(account_id)
444 .context("Failed to build ChatGPT account header")?,
445 );
446 }
447 Ok(request)
448 }
449
450 #[cfg(test)]
451 fn build_responses_ws_request_with_base_url(
452 base_url: &str,
453 token: &str,
454 ) -> Result<Request<()>> {
455 Self::build_responses_ws_request_with_base_url_and_account_id(base_url, token, None)
456 }
457
458 fn build_responses_ws_request_with_account_id(
459 token: &str,
460 chatgpt_account_id: Option<&str>,
461 ) -> Result<Request<()>> {
462 Self::build_responses_ws_request_with_base_url_and_account_id(
463 OPENAI_RESPONSES_WS_URL,
464 token,
465 chatgpt_account_id,
466 )
467 }
468
469 fn build_chatgpt_responses_ws_request(
470 token: &str,
471 chatgpt_account_id: Option<&str>,
472 ) -> Result<Request<()>> {
473 Self::build_responses_ws_request_with_base_url_and_account_id(
474 CHATGPT_CODEX_RESPONSES_WS_URL,
475 token,
476 chatgpt_account_id,
477 )
478 }
479
480 async fn connect_responses_ws_with_token(
481 &self,
482 token: &str,
483 chatgpt_account_id: Option<&str>,
484 backend: ResponsesWsBackend,
485 ) -> Result<OpenAiRealtimeConnection> {
486 let request = match backend {
487 ResponsesWsBackend::OpenAi => {
488 Self::build_responses_ws_request_with_account_id(token, chatgpt_account_id)?
489 }
490 ResponsesWsBackend::ChatGptCodex => {
491 Self::build_chatgpt_responses_ws_request(token, chatgpt_account_id)?
492 }
493 };
494 let (stream, _response) = connect_async(request)
495 .await
496 .context("Failed to connect to OpenAI Responses WebSocket")?;
497 Ok(OpenAiRealtimeConnection::new(stream))
498 }
499
500 pub async fn connect_responses_ws(&self) -> Result<OpenAiRealtimeConnection> {
501 let token = self.get_access_token().await?;
502 let account_id = self.resolved_chatgpt_account_id(&token);
503 let backend = if self.using_chatgpt_backend() {
504 ResponsesWsBackend::ChatGptCodex
505 } else {
506 ResponsesWsBackend::OpenAi
507 };
508 self.connect_responses_ws_with_token(&token, account_id.as_deref(), backend)
509 .await
510 }
511
512 #[allow(dead_code)]
514 pub async fn exchange_code(code: &str, verifier: &str) -> Result<OAuthCredentials> {
515 Self::exchange_code_with_redirect_uri(code, verifier, REDIRECT_URI).await
516 }
517
518 pub async fn exchange_code_with_redirect_uri(
520 code: &str,
521 verifier: &str,
522 redirect_uri: &str,
523 ) -> Result<OAuthCredentials> {
524 let client = crate::provider::shared_http::shared_client().clone();
525 let form_body = format!(
526 "grant_type={}&client_id={}&code={}&code_verifier={}&redirect_uri={}",
527 urlencoding::encode("authorization_code"),
528 CLIENT_ID,
529 urlencoding::encode(code),
530 urlencoding::encode(verifier),
531 urlencoding::encode(redirect_uri),
532 );
533
534 let response = client
535 .post(TOKEN_URL)
536 .header("Content-Type", "application/x-www-form-urlencoded")
537 .body(form_body)
538 .send()
539 .await
540 .context("Failed to exchange authorization code")?;
541
542 if !response.status().is_success() {
543 let body = response.text().await.unwrap_or_default();
544 anyhow::bail!("OAuth token exchange failed: {}", body);
545 }
546
547 #[derive(Deserialize)]
548 struct TokenResponse {
549 #[serde(default)]
550 id_token: Option<String>,
551 access_token: String,
552 refresh_token: String,
553 expires_in: u64,
554 }
555
556 let tokens: TokenResponse = response
557 .json()
558 .await
559 .context("Failed to parse token response")?;
560
561 let expires_at = std::time::SystemTime::now()
562 .duration_since(std::time::UNIX_EPOCH)
563 .context("System time error")?
564 .as_secs()
565 + tokens.expires_in;
566
567 let chatgpt_account_id = tokens
568 .id_token
569 .as_deref()
570 .and_then(Self::extract_chatgpt_account_id_from_jwt);
571
572 Ok(OAuthCredentials {
573 id_token: tokens.id_token,
574 chatgpt_account_id,
575 access_token: tokens.access_token,
576 refresh_token: tokens.refresh_token,
577 expires_at,
578 })
579 }
580
581 pub async fn exchange_id_token_for_api_key(id_token: &str) -> Result<String> {
582 #[derive(Deserialize)]
583 struct ExchangeResponse {
584 access_token: String,
585 }
586
587 let client = crate::provider::shared_http::shared_client().clone();
588 let form_body = format!(
589 "grant_type={}&client_id={}&requested_token={}&subject_token={}&subject_token_type={}",
590 urlencoding::encode("urn:ietf:params:oauth:grant-type:token-exchange"),
591 urlencoding::encode(CLIENT_ID),
592 urlencoding::encode("openai-api-key"),
593 urlencoding::encode(id_token),
594 urlencoding::encode("urn:ietf:params:oauth:token-type:id_token"),
595 );
596
597 let response = client
598 .post(TOKEN_URL)
599 .header("Content-Type", "application/x-www-form-urlencoded")
600 .body(form_body)
601 .send()
602 .await
603 .context("Failed to exchange id_token for OpenAI API key")?;
604
605 let status = response.status();
606 if !status.is_success() {
607 let body = response.text().await.unwrap_or_default();
608 anyhow::bail!("API key token exchange failed ({}): {}", status, body);
609 }
610
611 let payload: ExchangeResponse = response
612 .json()
613 .await
614 .context("Failed to parse API key token exchange response")?;
615 if payload.access_token.trim().is_empty() {
616 anyhow::bail!("API key token exchange returned an empty access token");
617 }
618 Ok(payload.access_token)
619 }
620
621 async fn refresh_access_token(&self, refresh_token: &str) -> Result<OAuthCredentials> {
623 let form_body = format!(
624 "grant_type={}&refresh_token={}&client_id={}",
625 urlencoding::encode("refresh_token"),
626 urlencoding::encode(refresh_token),
627 CLIENT_ID,
628 );
629
630 let response = self
631 .client
632 .post(TOKEN_URL)
633 .header("Content-Type", "application/x-www-form-urlencoded")
634 .body(form_body)
635 .send()
636 .await
637 .context("Failed to refresh access token")?;
638
639 if !response.status().is_success() {
640 let body = response.text().await.unwrap_or_default();
641 anyhow::bail!("Token refresh failed: {}", body);
642 }
643
644 #[derive(Deserialize)]
645 struct TokenResponse {
646 access_token: String,
647 refresh_token: String,
648 expires_in: u64,
649 }
650
651 let tokens: TokenResponse = response
652 .json()
653 .await
654 .context("Failed to parse refresh response")?;
655
656 let expires_at = std::time::SystemTime::now()
657 .duration_since(std::time::UNIX_EPOCH)
658 .context("System time error")?
659 .as_secs()
660 + tokens.expires_in;
661
662 Ok(OAuthCredentials {
663 id_token: None,
664 chatgpt_account_id: self.chatgpt_account_id.clone(),
665 access_token: tokens.access_token,
666 refresh_token: tokens.refresh_token,
667 expires_at,
668 })
669 }
670
671 async fn get_access_token(&self) -> Result<String> {
673 if let Some(ref api_key) = self.static_api_key {
674 return Ok(api_key.clone());
675 }
676
677 {
678 let cache = self.cached_tokens.read().await;
679 if let Some(ref tokens) = *cache
680 && tokens
681 .expires_at
682 .duration_since(std::time::Instant::now())
683 .as_secs()
684 > 300
685 {
686 return Ok(tokens.access_token.clone());
687 }
688 }
689
690 let mut cache = self.cached_tokens.write().await;
691
692 let creds = if let Some(ref stored) = self.stored_credentials {
693 let now = std::time::SystemTime::now()
694 .duration_since(std::time::UNIX_EPOCH)
695 .context("System time error")?
696 .as_secs();
697
698 if stored.expires_at > now + 300 {
699 stored.clone()
700 } else {
701 self.refresh_access_token(&stored.refresh_token).await?
702 }
703 } else {
704 anyhow::bail!("No OAuth credentials available. Run OAuth flow first.");
705 };
706
707 let expires_in = creds.expires_at
708 - std::time::SystemTime::now()
709 .duration_since(std::time::UNIX_EPOCH)
710 .context("System time error")?
711 .as_secs();
712
713 let cached = CachedTokens {
714 access_token: creds.access_token.clone(),
715 refresh_token: creds.refresh_token.clone(),
716 expires_at: std::time::Instant::now() + std::time::Duration::from_secs(expires_in),
717 };
718
719 let token = cached.access_token.clone();
720 *cache = Some(cached);
721 Ok(token)
722 }
723
724 #[allow(dead_code)]
725 fn convert_messages(messages: &[Message]) -> Vec<Value> {
726 messages
727 .iter()
728 .map(|msg| {
729 let role = match msg.role {
730 Role::System => "system",
731 Role::User => "user",
732 Role::Assistant => "assistant",
733 Role::Tool => "tool",
734 };
735
736 match msg.role {
737 Role::Tool => {
738 if let Some(ContentPart::ToolResult {
739 tool_call_id,
740 content,
741 }) = msg.content.first()
742 {
743 json!({
744 "role": "tool",
745 "tool_call_id": tool_call_id,
746 "content": content
747 })
748 } else {
749 json!({ "role": role, "content": "" })
750 }
751 }
752 Role::Assistant => {
753 let text: String = msg
754 .content
755 .iter()
756 .filter_map(|p| match p {
757 ContentPart::Text { text } => Some(text.clone()),
758 _ => None,
759 })
760 .collect::<Vec<_>>()
761 .join("");
762
763 let tool_calls: Vec<Value> = msg
764 .content
765 .iter()
766 .filter_map(|p| match p {
767 ContentPart::ToolCall {
768 id,
769 name,
770 arguments,
771 ..
772 } => Some(json!({
773 "id": id,
774 "type": "function",
775 "function": {
776 "name": name,
777 "arguments": arguments
778 }
779 })),
780 _ => None,
781 })
782 .collect();
783
784 if tool_calls.is_empty() {
785 json!({ "role": "assistant", "content": text })
786 } else {
787 json!({
788 "role": "assistant",
789 "content": if text.is_empty() { Value::Null } else { json!(text) },
790 "tool_calls": tool_calls
791 })
792 }
793 }
794 _ => {
795 let text: String = msg
796 .content
797 .iter()
798 .filter_map(|p| match p {
799 ContentPart::Text { text } => Some(text.clone()),
800 _ => None,
801 })
802 .collect::<Vec<_>>()
803 .join("\n");
804 json!({ "role": role, "content": text })
805 }
806 }
807 })
808 .collect()
809 }
810
811 #[allow(dead_code)]
812 fn convert_tools(tools: &[ToolDefinition]) -> Vec<Value> {
813 tools
814 .iter()
815 .map(|t| {
816 json!({
817 "type": "function",
818 "function": {
819 "name": t.name,
820 "description": t.description,
821 "parameters": t.parameters
822 }
823 })
824 })
825 .collect()
826 }
827
828 fn convert_responses_tools(tools: &[ToolDefinition]) -> Vec<Value> {
829 tools
830 .iter()
831 .map(|t| {
832 json!({
833 "type": "function",
834 "name": t.name,
835 "description": t.description,
836 "strict": false,
837 "parameters": t.parameters
838 })
839 })
840 .collect()
841 }
842
843 fn extract_responses_instructions(messages: &[Message]) -> String {
844 let instructions = messages
845 .iter()
846 .filter(|msg| matches!(msg.role, Role::System))
847 .filter_map(|msg| {
848 let text = msg
849 .content
850 .iter()
851 .filter_map(|p| match p {
852 ContentPart::Text { text } => Some(text.clone()),
853 _ => None,
854 })
855 .collect::<Vec<_>>()
856 .join("\n");
857 (!text.is_empty()).then_some(text)
858 })
859 .collect::<Vec<_>>()
860 .join("\n\n");
861
862 if instructions.is_empty() {
863 DEFAULT_RESPONSES_INSTRUCTIONS.to_string()
864 } else {
865 instructions
866 }
867 }
868
869 fn convert_messages_to_responses_input(messages: &[Message]) -> Vec<Value> {
870 let mut input = Vec::new();
871 let mut known_tool_call_ids = std::collections::HashSet::new();
872
873 for msg in messages {
874 match msg.role {
875 Role::System => {}
876 Role::User => {
877 let text: String = msg
878 .content
879 .iter()
880 .filter_map(|p| match p {
881 ContentPart::Text { text } => Some(text.clone()),
882 _ => None,
883 })
884 .collect::<Vec<_>>()
885 .join("\n");
886 if !text.is_empty() {
887 input.push(json!({
888 "type": "message",
889 "role": "user",
890 "content": [{
891 "type": "input_text",
892 "text": text
893 }]
894 }));
895 }
896 }
897 Role::Assistant => {
898 let text: String = msg
899 .content
900 .iter()
901 .filter_map(|p| match p {
902 ContentPart::Text { text } => Some(text.clone()),
903 _ => None,
904 })
905 .collect::<Vec<_>>()
906 .join("");
907
908 if !text.is_empty() {
909 input.push(json!({
910 "type": "message",
911 "role": "assistant",
912 "content": [{
913 "type": "output_text",
914 "text": text
915 }]
916 }));
917 }
918
919 for part in &msg.content {
920 if let ContentPart::ToolCall {
921 id,
922 name,
923 arguments,
924 ..
925 } = part
926 {
927 known_tool_call_ids.insert(id.clone());
928 input.push(json!({
929 "type": "function_call",
930 "call_id": id,
931 "name": name,
932 "arguments": arguments,
933 }));
934 }
935 }
936 }
937 Role::Tool => {
938 for part in &msg.content {
939 if let ContentPart::ToolResult {
940 tool_call_id,
941 content,
942 } = part
943 {
944 if known_tool_call_ids.contains(tool_call_id) {
945 input.push(json!({
946 "type": "function_call_output",
947 "call_id": tool_call_id,
948 "output": content,
949 }));
950 } else {
951 tracing::warn!(
952 tool_call_id = %tool_call_id,
953 "Skipping orphaned function_call_output while building Codex responses input"
954 );
955 }
956 }
957 }
958 }
959 }
960 }
961
962 input
963 }
964
965 fn using_chatgpt_backend(&self) -> bool {
966 self.static_api_key.is_none()
967 }
968
969 fn extract_chatgpt_account_id_from_jwt(jwt: &str) -> Option<String> {
970 let payload_b64 = jwt.split('.').nth(1)?;
971 let payload = URL_SAFE_NO_PAD.decode(payload_b64).ok()?;
972 let value: Value = serde_json::from_slice(&payload).ok()?;
973
974 value
975 .get("https://api.openai.com/auth")
976 .and_then(Value::as_object)
977 .and_then(|auth| auth.get("chatgpt_account_id"))
978 .and_then(Value::as_str)
979 .map(|s| s.to_string())
980 .or_else(|| {
981 value
982 .get("chatgpt_account_id")
983 .and_then(Value::as_str)
984 .map(|s| s.to_string())
985 })
986 .or_else(|| {
987 value
988 .get("organizations")
989 .and_then(Value::as_array)
990 .and_then(|orgs| orgs.first())
991 .and_then(|org| org.get("id"))
992 .and_then(Value::as_str)
993 .map(|s| s.to_string())
994 })
995 }
996
997 pub fn extract_chatgpt_account_id(token: &str) -> Option<String> {
998 Self::extract_chatgpt_account_id_from_jwt(token)
999 }
1000
1001 fn resolved_chatgpt_account_id(&self, access_token: &str) -> Option<String> {
1002 self.chatgpt_account_id
1003 .clone()
1004 .or_else(|| Self::extract_chatgpt_account_id_from_jwt(access_token))
1005 .or_else(|| {
1006 self.stored_credentials.as_ref().and_then(|creds| {
1007 creds
1008 .id_token
1009 .as_deref()
1010 .and_then(Self::extract_chatgpt_account_id_from_jwt)
1011 .or_else(|| creds.chatgpt_account_id.clone())
1012 })
1013 })
1014 }
1015
1016 fn parse_responses_usage(usage: Option<&Value>) -> Usage {
1017 let Some(usage) = usage else {
1018 return Usage::default();
1019 };
1020
1021 let prompt_tokens = usage
1022 .get("prompt_tokens")
1023 .or_else(|| usage.get("input_tokens"))
1024 .and_then(Value::as_u64)
1025 .unwrap_or(0) as usize;
1026
1027 let completion_tokens = usage
1028 .get("completion_tokens")
1029 .or_else(|| usage.get("output_tokens"))
1030 .and_then(Value::as_u64)
1031 .unwrap_or(0) as usize;
1032
1033 let total_tokens = usage
1034 .get("total_tokens")
1035 .and_then(Value::as_u64)
1036 .unwrap_or((prompt_tokens + completion_tokens) as u64)
1037 as usize;
1038
1039 let cached_tokens = usage
1046 .get("prompt_tokens_details")
1047 .or_else(|| usage.get("input_tokens_details"))
1048 .and_then(|d| d.get("cached_tokens"))
1049 .and_then(Value::as_u64)
1050 .unwrap_or(0) as usize;
1051
1052 Usage {
1053 prompt_tokens: prompt_tokens.saturating_sub(cached_tokens),
1054 completion_tokens,
1055 total_tokens,
1056 cache_read_tokens: if cached_tokens > 0 {
1057 Some(cached_tokens)
1058 } else {
1059 None
1060 },
1061 cache_write_tokens: None,
1062 }
1063 }
1064
1065 fn extract_json_string(value: Option<&Value>) -> Option<String> {
1066 match value {
1067 Some(Value::String(text)) => Some(text.clone()),
1068 Some(other) => serde_json::to_string(other).ok(),
1069 None => None,
1070 }
1071 }
1072
1073 #[allow(dead_code)]
1074 fn parse_responses_output_parts(output: &[Value]) -> (Vec<ContentPart>, bool) {
1075 let mut parts = Vec::new();
1076 let mut has_tool_calls = false;
1077
1078 for item in output {
1079 match item.get("type").and_then(Value::as_str) {
1080 Some("message") => {
1081 if let Some(content) = item.get("content").and_then(Value::as_array) {
1082 let text = content
1083 .iter()
1084 .filter(|segment| {
1085 matches!(
1086 segment.get("type").and_then(Value::as_str),
1087 Some("output_text") | Some("text")
1088 )
1089 })
1090 .filter_map(|segment| {
1091 segment
1092 .get("text")
1093 .and_then(Value::as_str)
1094 .map(str::to_string)
1095 })
1096 .collect::<Vec<_>>()
1097 .join("");
1098 if !text.is_empty() {
1099 parts.push(ContentPart::Text { text });
1100 }
1101 }
1102 }
1103 Some("function_call") => {
1104 let call_id = item
1105 .get("call_id")
1106 .or_else(|| item.get("id"))
1107 .and_then(Value::as_str);
1108 let name = item.get("name").and_then(Value::as_str);
1109 let arguments = item.get("arguments").and_then(Value::as_str);
1110 if let (Some(call_id), Some(name), Some(arguments)) = (call_id, name, arguments)
1111 {
1112 has_tool_calls = true;
1113 parts.push(ContentPart::ToolCall {
1114 id: call_id.to_string(),
1115 name: name.to_string(),
1116 arguments: arguments.to_string(),
1117 thought_signature: None,
1118 });
1119 }
1120 }
1121 _ => {}
1122 }
1123 }
1124
1125 (parts, has_tool_calls)
1126 }
1127
1128 fn parse_model_thinking_level(model: &str) -> (String, Option<ThinkingLevel>) {
1129 let Some((base_model, level_str)) = model.rsplit_once(':') else {
1130 return (model.to_string(), None);
1131 };
1132 let Some(level) = ThinkingLevel::parse(level_str) else {
1133 return (model.to_string(), None);
1134 };
1135 if base_model.trim().is_empty() {
1136 return (model.to_string(), None);
1137 }
1138 (base_model.to_string(), Some(level))
1139 }
1140
1141 fn env_thinking_level() -> Option<ThinkingLevel> {
1142 std::env::var(THINKING_LEVEL_ENV)
1143 .ok()
1144 .or_else(|| std::env::var(REASONING_EFFORT_ENV).ok())
1145 .as_deref()
1146 .and_then(ThinkingLevel::parse)
1147 }
1148
1149 fn model_supports_reasoning_effort(model: &str) -> bool {
1150 let normalized = model.to_ascii_lowercase();
1151 normalized.starts_with("gpt-5")
1152 || normalized.starts_with("o1")
1153 || normalized.starts_with("o3")
1154 || normalized.starts_with("o4")
1155 }
1156
1157 fn parse_service_tier_model_alias(model: &str) -> (String, Option<CodexServiceTier>) {
1158 match model {
1159 "gpt-5.4-fast" => ("gpt-5.4".to_string(), Some(CodexServiceTier::Priority)),
1161 "gpt-5.5-fast" => ("gpt-5.5".to_string(), Some(CodexServiceTier::Priority)),
1162 _ => (model.to_string(), None),
1163 }
1164 }
1165
1166 #[cfg(test)]
1167 fn resolve_model_and_reasoning_effort(model: &str) -> (String, Option<ThinkingLevel>) {
1168 let (base_model, level, _) =
1169 Self::resolve_model_and_reasoning_effort_and_service_tier(model);
1170 (base_model, level)
1171 }
1172
1173 fn resolve_model_and_reasoning_effort_and_service_tier(
1174 model: &str,
1175 ) -> (String, Option<ThinkingLevel>, Option<CodexServiceTier>) {
1176 let (base_model, level_from_model) = Self::parse_model_thinking_level(model);
1177 let level = level_from_model.or_else(Self::env_thinking_level);
1178 let (base_model, service_tier) = Self::parse_service_tier_model_alias(&base_model);
1179 if !Self::model_supports_reasoning_effort(&base_model) {
1180 return (base_model, None, service_tier);
1181 }
1182 (base_model, level, service_tier)
1183 }
1184
1185 fn apply_service_tier(payload: &mut Value, service_tier: Option<CodexServiceTier>) {
1186 if let Some(service_tier) = service_tier {
1187 payload["service_tier"] = json!(service_tier.as_str());
1188 }
1189 }
1190
1191 fn model_info(
1192 id: &str,
1193 name: &str,
1194 context_window: usize,
1195 max_output_tokens: usize,
1196 supports_vision: bool,
1197 ) -> ModelInfo {
1198 ModelInfo {
1199 id: id.to_string(),
1200 name: name.to_string(),
1201 provider: "openai-codex".to_string(),
1202 context_window,
1203 max_output_tokens: Some(max_output_tokens),
1204 supports_vision,
1205 supports_tools: true,
1206 supports_streaming: true,
1207 input_cost_per_million: Some(0.0),
1208 output_cost_per_million: Some(0.0),
1209 }
1210 }
1211
1212 fn api_key_hidden_model(model: &str) -> bool {
1213 matches!(model, "gpt-5.5" | "gpt-5.5-fast")
1214 }
1215
1216 fn format_openai_api_error(status: StatusCode, body: &str, model: &str) -> String {
1217 if status == StatusCode::UNAUTHORIZED && body.contains("Missing scopes: model.request") {
1218 return format!(
1219 "OpenAI Codex OAuth token is missing required scope `model.request` for model `{model}`.\n\
1220 Re-run `codetether auth codex` and complete OAuth approval with a ChatGPT subscription account \
1221 that has model access in your org/project."
1222 );
1223 }
1224 if status == StatusCode::UNAUTHORIZED
1225 && (body.contains("chatgpt-account-id")
1226 || body.contains("ChatGPT-Account-ID")
1227 || body.contains("workspace"))
1228 {
1229 return "OpenAI Codex auth is missing a ChatGPT workspace/account identifier.\n\
1230 Re-run `codetether auth codex --device-code` and sign in to the intended workspace."
1231 .to_string();
1232 }
1233 format!("OpenAI API error ({status}): {body}")
1234 }
1235
1236 #[cfg(test)]
1237 fn build_responses_ws_create_event(
1238 request: &CompletionRequest,
1239 model: &str,
1240 reasoning_effort: Option<ThinkingLevel>,
1241 service_tier: Option<CodexServiceTier>,
1242 ) -> Value {
1243 Self::build_responses_ws_create_event_for_backend(
1244 request,
1245 model,
1246 reasoning_effort,
1247 service_tier,
1248 ResponsesWsBackend::OpenAi,
1249 )
1250 }
1251
1252 fn build_responses_ws_create_event_for_backend(
1253 request: &CompletionRequest,
1254 model: &str,
1255 reasoning_effort: Option<ThinkingLevel>,
1256 service_tier: Option<CodexServiceTier>,
1257 backend: ResponsesWsBackend,
1258 ) -> Value {
1259 let instructions = Self::extract_responses_instructions(&request.messages);
1260 let input = Self::convert_messages_to_responses_input(&request.messages);
1261 let tools = Self::convert_responses_tools(&request.tools);
1262
1263 let mut event = json!({
1264 "type": "response.create",
1265 "model": model,
1266 "store": false,
1267 "instructions": instructions,
1268 "input": input,
1269 });
1270
1271 if !tools.is_empty() {
1272 event["tools"] = json!(tools);
1273 }
1274 if let Some(level) = reasoning_effort {
1275 event["reasoning"] = json!({ "effort": level.as_str() });
1276 }
1277 Self::apply_service_tier(&mut event, service_tier);
1278 if backend == ResponsesWsBackend::OpenAi {
1279 event["tool_choice"] = json!("auto");
1280 event["parallel_tool_calls"] = json!(true);
1281 }
1282 if backend == ResponsesWsBackend::OpenAi
1283 && let Some(max_tokens) = request.max_tokens
1284 {
1285 event["max_output_tokens"] = json!(max_tokens);
1286 }
1287
1288 event
1289 }
1290
1291 fn finish_responses_tool_call(
1292 parser: &mut ResponsesSseParser,
1293 key: &str,
1294 chunks: &mut Vec<StreamChunk>,
1295 ) {
1296 if let Some(state) = parser.tools.get_mut(key)
1297 && !state.finished
1298 {
1299 chunks.push(StreamChunk::ToolCallEnd {
1300 id: state.call_id.clone(),
1301 });
1302 state.finished = true;
1303 }
1304 }
1305
1306 fn parse_responses_event(
1307 parser: &mut ResponsesSseParser,
1308 event: &Value,
1309 chunks: &mut Vec<StreamChunk>,
1310 ) {
1311 match event.get("type").and_then(Value::as_str) {
1312 Some("response.output_text.delta") => {
1313 if let Some(delta) = event.get("delta").and_then(Value::as_str) {
1314 chunks.push(StreamChunk::Text(delta.to_string()));
1315 }
1316 }
1317 Some("response.output_item.added") => {
1318 if let Some(item) = event.get("item") {
1319 Self::record_responses_tool_item(parser, item, chunks, false);
1320 }
1321 }
1322 Some("response.function_call_arguments.delta") => {
1323 Self::record_responses_tool_arguments(parser, &event, chunks, false);
1324 }
1325 Some("response.function_call_arguments.done") => {
1326 Self::record_responses_tool_arguments(parser, &event, chunks, true);
1327 }
1328 Some("response.output_item.done") => {
1329 if let Some(item) = event.get("item")
1330 && item.get("type").and_then(Value::as_str) == Some("function_call")
1331 && let Some(key) = item
1332 .get("id")
1333 .and_then(Value::as_str)
1334 .or_else(|| item.get("call_id").and_then(Value::as_str))
1335 {
1336 Self::record_responses_tool_item(parser, item, chunks, true);
1337 Self::finish_responses_tool_call(parser, key, chunks);
1338 }
1339 }
1340 Some("response.completed") | Some("response.done")
1341 if event.pointer("/response/status").and_then(Value::as_str)
1342 != Some("in_progress") =>
1343 {
1344 if let Some(output) = event.pointer("/response/output").and_then(Value::as_array) {
1345 for item in output {
1346 if item.get("type").and_then(Value::as_str) == Some("function_call")
1347 && let Some(key) = item
1348 .get("id")
1349 .and_then(Value::as_str)
1350 .or_else(|| item.get("call_id").and_then(Value::as_str))
1351 {
1352 Self::record_responses_tool_item(parser, item, chunks, true);
1353 Self::finish_responses_tool_call(parser, key, chunks);
1354 }
1355 }
1356 }
1357 let failed_message = event
1358 .get("response")
1359 .and_then(|response| response.get("status"))
1360 .and_then(Value::as_str)
1361 .filter(|status| matches!(*status, "failed" | "cancelled" | "incomplete"))
1362 .map(|_| {
1363 event
1364 .get("response")
1365 .and_then(|response| response.get("error"))
1366 .and_then(|error| error.get("message"))
1367 .and_then(Value::as_str)
1368 .unwrap_or("Response failed")
1369 .to_string()
1370 });
1371 if let Some(message) = failed_message {
1372 chunks.push(StreamChunk::Error(message));
1373 return;
1374 }
1375 let usage = event
1376 .get("response")
1377 .and_then(|response| response.get("usage"))
1378 .map(|usage| Self::parse_responses_usage(Some(usage)));
1379 chunks.push(StreamChunk::Done { usage });
1380 }
1381 Some("response.failed") => {
1382 let message = event
1383 .get("response")
1384 .and_then(|response| response.get("error"))
1385 .and_then(|error| error.get("message"))
1386 .or_else(|| event.get("error").and_then(|error| error.get("message")))
1387 .and_then(Value::as_str)
1388 .unwrap_or("Response failed")
1389 .to_string();
1390 chunks.push(StreamChunk::Error(message));
1391 }
1392 Some("error") => {
1393 let message = event
1394 .get("error")
1395 .and_then(|error| error.get("message"))
1396 .or_else(|| event.get("message"))
1397 .and_then(Value::as_str)
1398 .unwrap_or("Realtime error")
1399 .to_string();
1400 chunks.push(StreamChunk::Error(message));
1401 }
1402 _ => {}
1403 }
1404 }
1405
1406 fn parse_responses_sse_event(
1407 parser: &mut ResponsesSseParser,
1408 data: &str,
1409 chunks: &mut Vec<StreamChunk>,
1410 ) {
1411 if data == "[DONE]" {
1412 chunks.push(StreamChunk::Done { usage: None });
1413 return;
1414 }
1415
1416 let Ok(event): Result<Value, _> = serde_json::from_str(data) else {
1417 return;
1418 };
1419
1420 Self::parse_responses_event(parser, &event, chunks);
1421 }
1422
1423 fn record_responses_tool_item(
1424 parser: &mut ResponsesSseParser,
1425 item: &Value,
1426 chunks: &mut Vec<StreamChunk>,
1427 include_arguments: bool,
1428 ) -> Option<String> {
1429 if item.get("type").and_then(Value::as_str) != Some("function_call") {
1430 return None;
1431 }
1432
1433 let key = item
1434 .get("id")
1435 .and_then(Value::as_str)
1436 .or_else(|| item.get("call_id").and_then(Value::as_str))?;
1437 let call_id = item
1438 .get("call_id")
1439 .and_then(Value::as_str)
1440 .or_else(|| item.get("id").and_then(Value::as_str))?
1441 .to_string();
1442 let name = item.get("name").and_then(Value::as_str).map(str::to_string);
1443 let arguments = if include_arguments {
1444 Self::extract_json_string(item.get("arguments"))
1445 } else {
1446 None
1447 };
1448
1449 let state = parser
1450 .tools
1451 .entry(key.to_string())
1452 .or_insert_with(|| ResponsesToolState {
1453 call_id: call_id.clone(),
1454 ..ResponsesToolState::default()
1455 });
1456 if state.call_id.is_empty() {
1457 state.call_id = call_id.clone();
1458 }
1459 if state.name.is_none() {
1460 state.name = name;
1461 }
1462 if !state.started
1463 && let Some(name) = state.name.clone()
1464 {
1465 chunks.push(StreamChunk::ToolCallStart {
1466 id: state.call_id.clone(),
1467 name,
1468 });
1469 state.started = true;
1470 }
1471 if let Some(arguments) = arguments {
1472 Self::emit_missing_responses_tool_arguments(state, arguments, chunks);
1473 }
1474
1475 Some(state.call_id.clone())
1476 }
1477
1478 fn record_responses_tool_arguments(
1479 parser: &mut ResponsesSseParser,
1480 event: &Value,
1481 chunks: &mut Vec<StreamChunk>,
1482 use_final_arguments: bool,
1483 ) {
1484 let key = event
1485 .get("item_id")
1486 .and_then(Value::as_str)
1487 .or_else(|| event.get("call_id").and_then(Value::as_str));
1488 let Some(key) = key else {
1489 return;
1490 };
1491
1492 let fallback_call_id = event
1493 .get("call_id")
1494 .and_then(Value::as_str)
1495 .unwrap_or(key)
1496 .to_string();
1497 let state = parser
1498 .tools
1499 .entry(key.to_string())
1500 .or_insert_with(|| ResponsesToolState {
1501 call_id: fallback_call_id.clone(),
1502 ..ResponsesToolState::default()
1503 });
1504 if state.call_id.is_empty() {
1505 state.call_id = fallback_call_id;
1506 }
1507
1508 if !state.started
1509 && let Some(name) = event.get("name").and_then(Value::as_str)
1510 {
1511 state.name = Some(name.to_string());
1512 chunks.push(StreamChunk::ToolCallStart {
1513 id: state.call_id.clone(),
1514 name: name.to_string(),
1515 });
1516 state.started = true;
1517 }
1518
1519 let arguments = if use_final_arguments {
1520 Self::extract_json_string(event.get("arguments")).or_else(|| {
1521 event
1522 .get("delta")
1523 .and_then(Value::as_str)
1524 .map(str::to_string)
1525 })
1526 } else {
1527 event
1528 .get("delta")
1529 .and_then(Value::as_str)
1530 .map(str::to_string)
1531 };
1532
1533 if let Some(arguments) = arguments {
1534 Self::emit_missing_responses_tool_arguments(state, arguments, chunks);
1535 }
1536 }
1537
1538 fn emit_missing_responses_tool_arguments(
1539 state: &mut ResponsesToolState,
1540 arguments: String,
1541 chunks: &mut Vec<StreamChunk>,
1542 ) {
1543 let delta = if arguments.starts_with(&state.emitted_arguments) {
1544 arguments[state.emitted_arguments.len()..].to_string()
1545 } else if state.emitted_arguments.is_empty() {
1546 arguments.clone()
1547 } else if arguments == state.emitted_arguments {
1548 String::new()
1549 } else {
1550 arguments.clone()
1551 };
1552
1553 if !delta.is_empty() {
1554 chunks.push(StreamChunk::ToolCallDelta {
1555 id: state.call_id.clone(),
1556 arguments_delta: delta.clone(),
1557 });
1558 state.emitted_arguments.push_str(&delta);
1559 }
1560 }
1561
1562 fn parse_responses_sse_bytes(
1563 parser: &mut ResponsesSseParser,
1564 bytes: &[u8],
1565 ) -> Vec<StreamChunk> {
1566 parser.line_buffer.push_str(&String::from_utf8_lossy(bytes));
1567 let mut chunks = Vec::new();
1568
1569 while let Some(line_end) = parser.line_buffer.find('\n') {
1570 let mut line = parser.line_buffer[..line_end].to_string();
1571 parser.line_buffer.drain(..=line_end);
1572
1573 if line.ends_with('\r') {
1574 line.pop();
1575 }
1576
1577 if line.is_empty() {
1578 if !parser.event_data_lines.is_empty() {
1579 let data = parser.event_data_lines.join("\n");
1580 parser.event_data_lines.clear();
1581 Self::parse_responses_sse_event(parser, &data, &mut chunks);
1582 }
1583 continue;
1584 }
1585
1586 if let Some(data) = line.strip_prefix("data:") {
1587 let data = data.strip_prefix(' ').unwrap_or(data);
1588 parser.event_data_lines.push(data.to_string());
1589 }
1590 }
1591
1592 chunks
1593 }
1594
1595 fn finish_responses_sse_parser(parser: &mut ResponsesSseParser) -> Vec<StreamChunk> {
1596 let mut chunks = Vec::new();
1597
1598 if !parser.line_buffer.is_empty() {
1599 let mut line = std::mem::take(&mut parser.line_buffer);
1600 if line.ends_with('\r') {
1601 line.pop();
1602 }
1603 if let Some(data) = line.strip_prefix("data:") {
1604 let data = data.strip_prefix(' ').unwrap_or(data);
1605 parser.event_data_lines.push(data.to_string());
1606 }
1607 }
1608
1609 if !parser.event_data_lines.is_empty() {
1610 let data = parser.event_data_lines.join("\n");
1611 parser.event_data_lines.clear();
1612 Self::parse_responses_sse_event(parser, &data, &mut chunks);
1613 }
1614
1615 chunks
1616 }
1617
1618 async fn complete_with_chatgpt_responses(
1619 &self,
1620 request: CompletionRequest,
1621 access_token: String,
1622 ) -> Result<CompletionResponse> {
1623 let stream = self
1624 .complete_stream_with_chatgpt_responses(request, access_token)
1625 .await?;
1626 Self::collect_stream_completion(stream).await
1627 }
1628
1629 async fn complete_with_openai_responses(
1630 &self,
1631 request: CompletionRequest,
1632 api_key: String,
1633 ) -> Result<CompletionResponse> {
1634 let stream = self
1635 .complete_stream_with_openai_responses(request, api_key)
1636 .await?;
1637 Self::collect_stream_completion(stream).await
1638 }
1639
1640 async fn collect_stream_completion(
1641 mut stream: BoxStream<'static, StreamChunk>,
1642 ) -> Result<CompletionResponse> {
1643 #[derive(Default)]
1644 struct ToolAccumulator {
1645 id: String,
1646 name: String,
1647 arguments: String,
1648 }
1649
1650 let mut text = String::new();
1651 let mut tools = Vec::<ToolAccumulator>::new();
1652 let mut tool_index_by_id = HashMap::<String, usize>::new();
1653 let mut usage = Usage::default();
1654
1655 while let Some(chunk) = stream.next().await {
1656 match chunk {
1657 StreamChunk::Text(delta) => text.push_str(&delta),
1658 StreamChunk::ToolCallStart { id, name } => {
1659 let next_idx = tools.len();
1660 let idx = *tool_index_by_id.entry(id.clone()).or_insert(next_idx);
1661 if idx == next_idx {
1662 tools.push(ToolAccumulator {
1663 id,
1664 name,
1665 arguments: String::new(),
1666 });
1667 } else if tools[idx].name == "tool" {
1668 tools[idx].name = name;
1669 }
1670 }
1671 StreamChunk::ToolCallDelta {
1672 id,
1673 arguments_delta,
1674 } => {
1675 if let Some(idx) = tool_index_by_id.get(&id).copied() {
1676 tools[idx].arguments.push_str(&arguments_delta);
1677 } else {
1678 let idx = tools.len();
1679 tool_index_by_id.insert(id.clone(), idx);
1680 tools.push(ToolAccumulator {
1681 id,
1682 name: "tool".to_string(),
1683 arguments: arguments_delta,
1684 });
1685 }
1686 }
1687 StreamChunk::ToolCallEnd { .. } | StreamChunk::Thinking(_) => {}
1688 StreamChunk::Done { usage: done_usage } => {
1689 if let Some(done_usage) = done_usage {
1690 usage = done_usage;
1691 }
1692 }
1693 StreamChunk::Error(message) => anyhow::bail!(message),
1694 }
1695 }
1696
1697 let mut content = Vec::new();
1698 if !text.is_empty() {
1699 content.push(ContentPart::Text { text });
1700 }
1701 for tool in tools {
1702 content.push(ContentPart::ToolCall {
1703 id: tool.id,
1704 name: tool.name,
1705 arguments: tool.arguments,
1706 thought_signature: None,
1707 });
1708 }
1709
1710 let finish_reason = if content
1711 .iter()
1712 .any(|part| matches!(part, ContentPart::ToolCall { .. }))
1713 {
1714 FinishReason::ToolCalls
1715 } else {
1716 FinishReason::Stop
1717 };
1718
1719 Ok(CompletionResponse {
1720 message: Message {
1721 role: Role::Assistant,
1722 content,
1723 },
1724 usage,
1725 finish_reason,
1726 })
1727 }
1728
1729 async fn complete_stream_with_chatgpt_responses(
1730 &self,
1731 request: CompletionRequest,
1732 access_token: String,
1733 ) -> Result<BoxStream<'static, StreamChunk>> {
1734 let account_id = self.resolved_chatgpt_account_id(&access_token).context(
1735 "OpenAI Codex OAuth token is missing ChatGPT workspace/account ID. Re-run `codetether auth codex --device-code`.",
1736 )?;
1737 match self
1738 .complete_stream_with_realtime(
1739 request.clone(),
1740 access_token.clone(),
1741 Some(account_id.clone()),
1742 "chatgpt-codex-responses-ws",
1743 ResponsesWsBackend::ChatGptCodex,
1744 )
1745 .await
1746 {
1747 Ok(stream) => Ok(stream),
1748 Err(error) => {
1749 tracing::warn!(
1750 error = %error,
1751 "Responses WebSocket connect failed for ChatGPT Codex backend; falling back to HTTP responses streaming"
1752 );
1753 self.complete_stream_with_chatgpt_http_responses(request, access_token, account_id)
1754 .await
1755 }
1756 }
1757 }
1758
1759 async fn complete_stream_with_openai_responses(
1760 &self,
1761 request: CompletionRequest,
1762 api_key: String,
1763 ) -> Result<BoxStream<'static, StreamChunk>> {
1764 match self
1765 .complete_stream_with_realtime(
1766 request.clone(),
1767 api_key.clone(),
1768 None,
1769 "openai-responses-ws",
1770 ResponsesWsBackend::OpenAi,
1771 )
1772 .await
1773 {
1774 Ok(stream) => Ok(stream),
1775 Err(error) => {
1776 tracing::warn!(
1777 error = %error,
1778 "Responses WebSocket connect failed for OpenAI backend; falling back to HTTP responses streaming"
1779 );
1780 self.complete_stream_with_openai_http_responses(request, api_key)
1781 .await
1782 }
1783 }
1784 }
1785
1786 async fn complete_stream_with_chatgpt_http_responses(
1787 &self,
1788 request: CompletionRequest,
1789 access_token: String,
1790 account_id: String,
1791 ) -> Result<BoxStream<'static, StreamChunk>> {
1792 let (model, reasoning_effort, service_tier) =
1793 Self::resolve_model_and_reasoning_effort_and_service_tier(&request.model);
1794 let instructions = Self::extract_responses_instructions(&request.messages);
1795 let input = Self::convert_messages_to_responses_input(&request.messages);
1796 let tools = Self::convert_responses_tools(&request.tools);
1797
1798 let mut body = json!({
1799 "model": model,
1800 "instructions": instructions,
1801 "input": input,
1802 "stream": true,
1803 "store": false,
1804 "tool_choice": "auto",
1805 "parallel_tool_calls": true,
1806 });
1807
1808 if !tools.is_empty() {
1809 body["tools"] = json!(tools);
1810 }
1811 if let Some(level) = reasoning_effort {
1812 body["reasoning"] = json!({ "effort": level.as_str() });
1813 }
1814 Self::apply_service_tier(&mut body, service_tier);
1815
1816 tracing::info!(
1817 backend = "chatgpt-codex-responses-http",
1818 instructions_len = body
1819 .get("instructions")
1820 .and_then(|v| v.as_str())
1821 .map(str::len)
1822 .unwrap_or(0),
1823 input_items = body
1824 .get("input")
1825 .and_then(|v| v.as_array())
1826 .map(Vec::len)
1827 .unwrap_or(0),
1828 has_tools = !tools.is_empty(),
1829 "Sending HTTP responses request"
1830 );
1831
1832 let response = self
1833 .client
1834 .post(format!("{}/responses", CHATGPT_CODEX_API_URL))
1835 .header("Authorization", format!("Bearer {}", access_token))
1836 .header("chatgpt-account-id", account_id)
1837 .header("Content-Type", "application/json")
1838 .json(&body)
1839 .send()
1840 .await
1841 .context("Failed to send streaming request to ChatGPT Codex backend")?;
1842
1843 let status = response.status();
1844 if !status.is_success() {
1845 let body = response.text().await.unwrap_or_default();
1846 anyhow::bail!(Self::format_openai_api_error(status, &body, &request.model));
1847 }
1848
1849 let stream = async_stream::stream! {
1850 let mut parser = ResponsesSseParser::default();
1851 let mut byte_stream = response.bytes_stream();
1852
1853 while let Some(result) = byte_stream.next().await {
1854 match result {
1855 Ok(bytes) => {
1856 for chunk in Self::parse_responses_sse_bytes(&mut parser, &bytes) {
1857 yield chunk;
1858 }
1859 }
1860 Err(error) => yield StreamChunk::Error(error.to_string()),
1861 }
1862 }
1863
1864 for chunk in Self::finish_responses_sse_parser(&mut parser) {
1865 yield chunk;
1866 }
1867 };
1868
1869 Ok(Box::pin(stream))
1870 }
1871
1872 async fn complete_stream_with_openai_http_responses(
1873 &self,
1874 request: CompletionRequest,
1875 api_key: String,
1876 ) -> Result<BoxStream<'static, StreamChunk>> {
1877 let (model, reasoning_effort, service_tier) =
1878 Self::resolve_model_and_reasoning_effort_and_service_tier(&request.model);
1879 let instructions = Self::extract_responses_instructions(&request.messages);
1880 let input = Self::convert_messages_to_responses_input(&request.messages);
1881 let tools = Self::convert_responses_tools(&request.tools);
1882
1883 let mut body = json!({
1884 "model": model,
1885 "instructions": instructions,
1886 "input": input,
1887 "stream": true,
1888 "store": false,
1889 "tool_choice": "auto",
1890 "parallel_tool_calls": true,
1891 });
1892
1893 if !tools.is_empty() {
1894 body["tools"] = json!(tools);
1895 }
1896 if let Some(level) = reasoning_effort {
1897 body["reasoning"] = json!({ "effort": level.as_str() });
1898 }
1899 Self::apply_service_tier(&mut body, service_tier);
1900
1901 tracing::info!(
1902 backend = "openai-responses-http",
1903 instructions_len = body
1904 .get("instructions")
1905 .and_then(|v| v.as_str())
1906 .map(str::len)
1907 .unwrap_or(0),
1908 input_items = body
1909 .get("input")
1910 .and_then(|v| v.as_array())
1911 .map(Vec::len)
1912 .unwrap_or(0),
1913 has_tools = !tools.is_empty(),
1914 "Sending HTTP responses request"
1915 );
1916
1917 let response = self
1918 .client
1919 .post(format!("{}/responses", OPENAI_API_URL))
1920 .header("Authorization", format!("Bearer {}", api_key))
1921 .header("Content-Type", "application/json")
1922 .json(&body)
1923 .send()
1924 .await
1925 .context("Failed to send streaming request to OpenAI responses API")?;
1926
1927 let status = response.status();
1928 if !status.is_success() {
1929 let body = response.text().await.unwrap_or_default();
1930 anyhow::bail!(Self::format_openai_api_error(status, &body, &request.model));
1931 }
1932
1933 let stream = async_stream::stream! {
1934 let mut parser = ResponsesSseParser::default();
1935 let mut byte_stream = response.bytes_stream();
1936
1937 while let Some(result) = byte_stream.next().await {
1938 match result {
1939 Ok(bytes) => {
1940 for chunk in Self::parse_responses_sse_bytes(&mut parser, &bytes) {
1941 yield chunk;
1942 }
1943 }
1944 Err(error) => yield StreamChunk::Error(error.to_string()),
1945 }
1946 }
1947
1948 for chunk in Self::finish_responses_sse_parser(&mut parser) {
1949 yield chunk;
1950 }
1951 };
1952
1953 Ok(Box::pin(stream))
1954 }
1955
1956 async fn complete_stream_with_realtime(
1957 &self,
1958 request: CompletionRequest,
1959 access_token: String,
1960 chatgpt_account_id: Option<String>,
1961 backend: &'static str,
1962 ws_backend: ResponsesWsBackend,
1963 ) -> Result<BoxStream<'static, StreamChunk>> {
1964 let (model, reasoning_effort, service_tier) =
1965 Self::resolve_model_and_reasoning_effort_and_service_tier(&request.model);
1966 let body = Self::build_responses_ws_create_event_for_backend(
1967 &request,
1968 &model,
1969 reasoning_effort,
1970 service_tier,
1971 ws_backend,
1972 );
1973 tracing::info!(
1974 backend = backend,
1975 instructions_len = body
1976 .get("instructions")
1977 .and_then(|v| v.as_str())
1978 .map(str::len)
1979 .unwrap_or(0),
1980 input_items = body
1981 .get("input")
1982 .and_then(|v| v.as_array())
1983 .map(Vec::len)
1984 .unwrap_or(0),
1985 has_tools = body
1986 .get("tools")
1987 .and_then(|v| v.as_array())
1988 .is_some_and(|tools| !tools.is_empty()),
1989 "Sending responses websocket request"
1990 );
1991
1992 let connection = self
1993 .connect_responses_ws_with_token(
1994 &access_token,
1995 chatgpt_account_id.as_deref(),
1996 ws_backend,
1997 )
1998 .await?;
1999
2000 let stream = async_stream::stream! {
2001 let mut connection = connection;
2002 let mut parser = ResponsesSseParser::default();
2003 if let Err(error) = connection.send_event(&body).await {
2004 yield StreamChunk::Error(error.to_string());
2005 let _ = connection.close().await;
2006 return;
2007 }
2008
2009 let mut saw_terminal = false;
2010 loop {
2011 match connection.recv_event().await {
2012 Ok(Some(event)) => {
2013 let mut chunks = Vec::new();
2014 Self::parse_responses_event(&mut parser, &event, &mut chunks);
2015 for chunk in chunks {
2016 if matches!(chunk, StreamChunk::Done { .. } | StreamChunk::Error(_)) {
2017 saw_terminal = true;
2018 }
2019 yield chunk;
2020 }
2021 if saw_terminal {
2022 break;
2023 }
2024 }
2025 Ok(None) => {
2026 if !saw_terminal {
2027 yield StreamChunk::Error("Realtime WebSocket closed before response completion".to_string());
2028 }
2029 break;
2030 }
2031 Err(error) => {
2032 yield StreamChunk::Error(error.to_string());
2033 break;
2034 }
2035 }
2036 }
2037
2038 let _ = connection.close().await;
2039 };
2040
2041 Ok(Box::pin(stream))
2042 }
2043}
2044
2045#[async_trait]
2046impl Provider for OpenAiCodexProvider {
2047 fn name(&self) -> &str {
2048 "openai-codex"
2049 }
2050
2051 async fn list_models(&self) -> Result<Vec<ModelInfo>> {
2052 let mut models = vec![
2053 Self::model_info("gpt-5", "GPT-5", 400_000, 128_000, false),
2054 Self::model_info("gpt-5-mini", "GPT-5 Mini", 264_000, 64_000, false),
2055 Self::model_info("gpt-5.1-codex", "GPT-5.1 Codex", 400_000, 128_000, false),
2056 Self::model_info("gpt-5.2", "GPT-5.2", 400_000, 128_000, false),
2057 Self::model_info("gpt-5.3-codex", "GPT-5.3 Codex", 400_000, 128_000, false),
2058 Self::model_info("gpt-5.4", "GPT-5.4", 272_000, 128_000, false),
2059 Self::model_info("gpt-5.4-fast", "GPT-5.4 Fast", 272_000, 128_000, false),
2060 Self::model_info("gpt-5.4-pro", "GPT-5.4 Pro", 272_000, 128_000, false),
2061 Self::model_info("gpt-5.5", "GPT-5.5", 400_000, 128_000, false),
2062 Self::model_info("gpt-5.5-fast", "GPT-5.5 Fast", 400_000, 128_000, false),
2063 Self::model_info("o3", "O3", 200_000, 100_000, true),
2064 Self::model_info("o4-mini", "O4 Mini", 200_000, 100_000, true),
2065 ];
2066
2067 if self.using_chatgpt_backend() {
2068 models.retain(|model| Self::chatgpt_supported_models().contains(&model.id.as_str()));
2069 } else {
2070 models.retain(|model| !Self::api_key_hidden_model(&model.id));
2071 }
2072
2073 Ok(models)
2074 }
2075
2076 async fn complete(&self, request: CompletionRequest) -> Result<CompletionResponse> {
2077 self.validate_model_for_backend(&request.model)?;
2078 let access_token = self.get_access_token().await?;
2079 if self.using_chatgpt_backend() {
2080 return self
2081 .complete_with_chatgpt_responses(request, access_token)
2082 .await;
2083 }
2084 self.complete_with_openai_responses(request, access_token)
2085 .await
2086 }
2087
2088 async fn complete_stream(
2089 &self,
2090 request: CompletionRequest,
2091 ) -> Result<BoxStream<'static, StreamChunk>> {
2092 self.validate_model_for_backend(&request.model)?;
2093 let access_token = self.get_access_token().await?;
2094 if self.using_chatgpt_backend() {
2095 return self
2096 .complete_stream_with_chatgpt_responses(request, access_token)
2097 .await;
2098 }
2099 self.complete_stream_with_openai_responses(request, access_token)
2100 .await
2101 }
2102}
2103
2104#[cfg(test)]
2105mod tests {
2106 use super::*;
2107 use futures::stream;
2108 use tokio::io::duplex;
2109 use tokio_tungstenite::{
2110 accept_hdr_async, client_async,
2111 tungstenite::{
2112 Message as WsMessage,
2113 handshake::server::{Request as ServerRequest, Response as ServerResponse},
2114 },
2115 };
2116
2117 #[test]
2118 fn test_generate_pkce() {
2119 let pkce = OpenAiCodexProvider::generate_pkce();
2120 assert!(!pkce.verifier.is_empty());
2121 assert!(!pkce.challenge.is_empty());
2122 assert_ne!(pkce.verifier, pkce.challenge);
2123 }
2124
2125 #[test]
2126 fn test_generate_state() {
2127 let state = OpenAiCodexProvider::generate_state();
2128 assert_eq!(state.len(), 32);
2129 }
2130
2131 #[test]
2132 fn formats_scope_error_with_actionable_message() {
2133 let body = r#"{"error":{"message":"Missing scopes: model.request"}}"#;
2134 let msg = OpenAiCodexProvider::format_openai_api_error(
2135 StatusCode::UNAUTHORIZED,
2136 body,
2137 "gpt-5.3-codex",
2138 );
2139 assert!(msg.contains("model.request"));
2140 assert!(msg.contains("codetether auth codex"));
2141 }
2142
2143 #[test]
2144 fn parses_model_suffix_for_thinking_level() {
2145 let (model, level) =
2146 OpenAiCodexProvider::resolve_model_and_reasoning_effort("gpt-5.3-codex:high");
2147 assert_eq!(model, "gpt-5.3-codex");
2148 assert_eq!(level.map(ThinkingLevel::as_str), Some("high"));
2149
2150 let (model, level) =
2151 OpenAiCodexProvider::resolve_model_and_reasoning_effort("gpt-5.5:none");
2152 assert_eq!(model, "gpt-5.5");
2153 assert_eq!(level.map(ThinkingLevel::as_str), Some("none"));
2154
2155 let (model, level) =
2156 OpenAiCodexProvider::resolve_model_and_reasoning_effort("gpt-5.5:xhigh");
2157 assert_eq!(model, "gpt-5.5");
2158 assert_eq!(level.map(ThinkingLevel::as_str), Some("xhigh"));
2159 }
2160
2161 #[test]
2162 fn maps_fast_model_alias_to_priority_service_tier() {
2163 let (model, level, service_tier) =
2164 OpenAiCodexProvider::resolve_model_and_reasoning_effort_and_service_tier(
2165 "gpt-5.4-fast:high",
2166 );
2167 assert_eq!(model, "gpt-5.4");
2168 assert_eq!(level.map(ThinkingLevel::as_str), Some("high"));
2169 assert_eq!(service_tier.map(CodexServiceTier::as_str), Some("priority"));
2170
2171 let (model, level, service_tier) =
2172 OpenAiCodexProvider::resolve_model_and_reasoning_effort_and_service_tier(
2173 "gpt-5.5-fast:high",
2174 );
2175 assert_eq!(model, "gpt-5.5");
2176 assert_eq!(level.map(ThinkingLevel::as_str), Some("high"));
2177 assert_eq!(service_tier.map(CodexServiceTier::as_str), Some("priority"));
2178 }
2179
2180 #[test]
2181 fn ignores_unknown_model_suffix() {
2182 let (model, level) =
2183 OpenAiCodexProvider::resolve_model_and_reasoning_effort("gpt-5.3-codex:turbo");
2184 assert_eq!(model, "gpt-5.3-codex:turbo");
2185 assert_eq!(level, None);
2186
2187 let (model, level) =
2188 OpenAiCodexProvider::resolve_model_and_reasoning_effort("gpt-5.5:minimal");
2189 assert_eq!(model, "gpt-5.5:minimal");
2190 assert_eq!(level, None);
2191 }
2192
2193 #[test]
2194 fn skips_reasoning_effort_for_non_reasoning_models() {
2195 let (model, level) = OpenAiCodexProvider::resolve_model_and_reasoning_effort("gpt-4o:high");
2196 assert_eq!(model, "gpt-4o");
2197 assert_eq!(level, None);
2198 }
2199
2200 #[tokio::test]
2201 async fn lists_chatgpt_codex_models() {
2202 let provider = OpenAiCodexProvider::new();
2203 let models = provider
2204 .list_models()
2205 .await
2206 .expect("model listing should succeed");
2207
2208 assert!(models.iter().any(|model| model.id == "gpt-5.4"));
2209 assert!(models.iter().any(|model| model.id == "gpt-5-mini"));
2210 assert!(!models.iter().any(|model| model.id == "gpt-5.4-fast"));
2211 assert!(models.iter().any(|model| model.id == "gpt-5.5"));
2212 assert!(models.iter().any(|model| model.id == "gpt-5.5-fast"));
2213 assert!(!models.iter().any(|model| model.id == "gpt-5.4-pro"));
2214 }
2215
2216 #[tokio::test]
2217 async fn omits_gpt_5_5_from_api_key_model_listing_for_now() {
2218 let provider = OpenAiCodexProvider::from_api_key("test-key".to_string());
2219 let models = provider
2220 .list_models()
2221 .await
2222 .expect("model listing should succeed");
2223
2224 assert!(!models.iter().any(|model| model.id == "gpt-5.5"));
2225 assert!(!models.iter().any(|model| model.id == "gpt-5.5-fast"));
2226 }
2227
2228 #[test]
2229 fn rejects_pro_model_for_chatgpt_backend() {
2230 let provider = OpenAiCodexProvider::new();
2231 let err = provider
2232 .validate_model_for_backend("gpt-5.4-pro")
2233 .expect_err("chatgpt backend should reject unsupported model");
2234 assert!(
2235 err.to_string()
2236 .contains("not supported when using Codex with a ChatGPT account")
2237 );
2238 }
2239
2240 #[test]
2241 fn allows_pro_model_for_api_key_backend() {
2242 let provider = OpenAiCodexProvider::from_api_key("test-key".to_string());
2243 provider
2244 .validate_model_for_backend("gpt-5.4-pro")
2245 .expect("api key backend should allow pro model");
2246 }
2247
2248 #[test]
2249 fn allows_fast_alias_for_chatgpt_backend() {
2250 let provider = OpenAiCodexProvider::new();
2251 provider
2252 .validate_model_for_backend("gpt-5.4-fast:high")
2253 .expect("chatgpt backend should allow fast alias");
2254 provider
2255 .validate_model_for_backend("gpt-5.5-fast:high")
2256 .expect("chatgpt backend should allow GPT-5.5 fast alias");
2257 }
2258
2259 #[test]
2260 fn extracts_chatgpt_account_id_from_jwt_claims() {
2261 let header = URL_SAFE_NO_PAD.encode(r#"{"alg":"none","typ":"JWT"}"#);
2262 let payload = URL_SAFE_NO_PAD
2263 .encode(r#"{"https://api.openai.com/auth":{"chatgpt_account_id":"org_test123"}}"#);
2264 let jwt = format!("{header}.{payload}.sig");
2265
2266 let account_id = OpenAiCodexProvider::extract_chatgpt_account_id(&jwt);
2267 assert_eq!(account_id.as_deref(), Some("org_test123"));
2268 }
2269
2270 #[test]
2271 fn extracts_chatgpt_account_id_from_organizations_claim() {
2272 let header = URL_SAFE_NO_PAD.encode(r#"{"alg":"none","typ":"JWT"}"#);
2273 let payload = URL_SAFE_NO_PAD.encode(r#"{"organizations":[{"id":"org_from_list"}]}"#);
2274 let jwt = format!("{header}.{payload}.sig");
2275
2276 let account_id = OpenAiCodexProvider::extract_chatgpt_account_id(&jwt);
2277 assert_eq!(account_id.as_deref(), Some("org_from_list"));
2278 }
2279
2280 #[test]
2281 fn extracts_responses_instructions_from_system_messages() {
2282 let messages = vec![
2283 Message {
2284 role: Role::System,
2285 content: vec![ContentPart::Text {
2286 text: "first system block".to_string(),
2287 }],
2288 },
2289 Message {
2290 role: Role::User,
2291 content: vec![ContentPart::Text {
2292 text: "user message".to_string(),
2293 }],
2294 },
2295 Message {
2296 role: Role::System,
2297 content: vec![ContentPart::Text {
2298 text: "second system block".to_string(),
2299 }],
2300 },
2301 ];
2302
2303 let instructions = OpenAiCodexProvider::extract_responses_instructions(&messages);
2304 assert_eq!(instructions, "first system block\n\nsecond system block");
2305 }
2306
2307 #[test]
2308 fn falls_back_to_default_responses_instructions_without_system_message() {
2309 let messages = vec![Message {
2310 role: Role::User,
2311 content: vec![ContentPart::Text {
2312 text: "only user".to_string(),
2313 }],
2314 }];
2315
2316 let instructions = OpenAiCodexProvider::extract_responses_instructions(&messages);
2317 assert_eq!(instructions, DEFAULT_RESPONSES_INSTRUCTIONS);
2318 }
2319
2320 #[test]
2321 fn responses_input_ignores_system_messages() {
2322 let messages = vec![
2323 Message {
2324 role: Role::System,
2325 content: vec![ContentPart::Text {
2326 text: "system".to_string(),
2327 }],
2328 },
2329 Message {
2330 role: Role::User,
2331 content: vec![ContentPart::Text {
2332 text: "user".to_string(),
2333 }],
2334 },
2335 ];
2336
2337 let input = OpenAiCodexProvider::convert_messages_to_responses_input(&messages);
2338 assert_eq!(input.len(), 1);
2339 assert_eq!(input[0].get("role").and_then(Value::as_str), Some("user"));
2340 }
2341
2342 #[test]
2343 fn responses_ws_request_uses_bearer_auth() {
2344 let request = OpenAiCodexProvider::build_responses_ws_request_with_base_url(
2345 "wss://example.com/v1/responses",
2346 "test-token",
2347 )
2348 .expect("request should build");
2349
2350 assert_eq!(request.uri().to_string(), "wss://example.com/v1/responses");
2351 assert_eq!(
2352 request
2353 .headers()
2354 .get("Authorization")
2355 .and_then(|v| v.to_str().ok()),
2356 Some("Bearer test-token")
2357 );
2358 assert_eq!(
2359 request
2360 .headers()
2361 .get("User-Agent")
2362 .and_then(|v| v.to_str().ok()),
2363 Some("codetether-responses-ws/1.0")
2364 );
2365 }
2366
2367 #[test]
2368 fn responses_ws_request_includes_chatgpt_account_id_when_provided() {
2369 let request = OpenAiCodexProvider::build_responses_ws_request_with_base_url_and_account_id(
2370 "wss://example.com/v1/responses",
2371 "test-token",
2372 Some("org_123"),
2373 )
2374 .expect("request should build");
2375
2376 assert_eq!(
2377 request
2378 .headers()
2379 .get("chatgpt-account-id")
2380 .and_then(|v| v.to_str().ok()),
2381 Some("org_123")
2382 );
2383 }
2384
2385 #[test]
2386 fn builds_responses_ws_create_event_for_tools() {
2387 let request = CompletionRequest {
2388 messages: vec![
2389 Message {
2390 role: Role::System,
2391 content: vec![ContentPart::Text {
2392 text: "System prompt".to_string(),
2393 }],
2394 },
2395 Message {
2396 role: Role::User,
2397 content: vec![ContentPart::Text {
2398 text: "Inspect the repo".to_string(),
2399 }],
2400 },
2401 ],
2402 tools: vec![ToolDefinition {
2403 name: "read".to_string(),
2404 description: "Read a file".to_string(),
2405 parameters: json!({
2406 "type": "object",
2407 "properties": {
2408 "path": { "type": "string" }
2409 },
2410 "required": ["path"]
2411 }),
2412 }],
2413 model: "gpt-5.4".to_string(),
2414 temperature: None,
2415 top_p: None,
2416 max_tokens: Some(8192),
2417 stop: Vec::new(),
2418 };
2419
2420 let event = OpenAiCodexProvider::build_responses_ws_create_event(
2421 &request,
2422 "gpt-5.4",
2423 Some(ThinkingLevel::High),
2424 None,
2425 );
2426
2427 assert_eq!(
2428 event.get("type").and_then(Value::as_str),
2429 Some("response.create")
2430 );
2431 assert_eq!(event.get("model").and_then(Value::as_str), Some("gpt-5.4"));
2432 assert_eq!(event.get("store").and_then(Value::as_bool), Some(false));
2433 assert_eq!(
2434 event.get("max_output_tokens").and_then(Value::as_u64),
2435 Some(8192)
2436 );
2437 assert_eq!(
2438 event.get("tools").and_then(Value::as_array).map(Vec::len),
2439 Some(1)
2440 );
2441 }
2442
2443 #[tokio::test]
2444 async fn responses_ws_connection_round_trips_json_events() {
2445 let (client_io, server_io) = duplex(16 * 1024);
2446
2447 let server = tokio::spawn(async move {
2448 let callback = |request: &ServerRequest, response: ServerResponse| {
2449 assert_eq!(request.uri().path(), "/v1/responses");
2450 assert_eq!(
2451 request
2452 .headers()
2453 .get("Authorization")
2454 .and_then(|v| v.to_str().ok()),
2455 Some("Bearer test-token")
2456 );
2457 Ok(response)
2458 };
2459
2460 let mut socket = accept_hdr_async(server_io, callback)
2461 .await
2462 .expect("server websocket handshake should succeed");
2463 let message = socket
2464 .next()
2465 .await
2466 .expect("server should receive message")
2467 .expect("server message should be valid");
2468 match message {
2469 WsMessage::Text(text) => {
2470 let event: Value =
2471 serde_json::from_str(&text).expect("server should parse JSON event");
2472 assert_eq!(
2473 event.get("type").and_then(Value::as_str),
2474 Some("response.create")
2475 );
2476 }
2477 other => panic!("expected text frame, got {other:?}"),
2478 }
2479
2480 socket
2481 .send(WsMessage::Text(
2482 json!({ "type": "response.created" }).to_string().into(),
2483 ))
2484 .await
2485 .expect("server should send response");
2486 });
2487
2488 let request = OpenAiCodexProvider::build_responses_ws_request_with_base_url(
2489 "ws://localhost/v1/responses",
2490 "test-token",
2491 )
2492 .expect("client request should build");
2493 let (stream, _) = client_async(request, client_io)
2494 .await
2495 .expect("client websocket handshake should succeed");
2496 let mut connection = OpenAiRealtimeConnection::new(stream);
2497
2498 connection
2499 .send_event(&json!({ "type": "response.create", "model": "gpt-5.4", "input": [] }))
2500 .await
2501 .expect("client should send event");
2502 let event = connection
2503 .recv_event()
2504 .await
2505 .expect("client should read event")
2506 .expect("client should receive session event");
2507 assert_eq!(
2508 event.get("type").and_then(Value::as_str),
2509 Some("response.created")
2510 );
2511 connection
2512 .close()
2513 .await
2514 .expect("client should close cleanly");
2515
2516 server.await.expect("server task should finish");
2517 }
2518
2519 #[test]
2520 fn responses_sse_parser_buffers_split_tool_call_events() {
2521 let mut parser = ResponsesSseParser::default();
2522
2523 let chunk1 = br#"data: {"type":"response.output_item.added","item":{"type":"function_call","id":"fc_1","call_id":"call_1","name":"bash"}}
2524
2525da"#;
2526 let first = OpenAiCodexProvider::parse_responses_sse_bytes(&mut parser, chunk1);
2527 assert_eq!(first.len(), 1);
2528 match &first[0] {
2529 StreamChunk::ToolCallStart { id, name } => {
2530 assert_eq!(id, "call_1");
2531 assert_eq!(name, "bash");
2532 }
2533 other => panic!("expected tool start, got {other:?}"),
2534 }
2535
2536 let chunk2 = br#"ta: {"type":"response.function_call_arguments.delta","item_id":"fc_1","delta":"{\"command\":"}
2537
2538data: {"type":"response.function_call_arguments.delta","item_id":"fc_1","delta":"\"ls\"}"}
2539
2540data: {"type":"response.output_item.done","item":{"type":"function_call","id":"fc_1","call_id":"call_1","name":"bash","arguments":"{\"command\":\"ls\"}"}}
2541
2542data: {"type":"response.completed","response":{"usage":{"input_tokens":11,"output_tokens":7,"total_tokens":18}}}
2543
2544"#;
2545 let second = OpenAiCodexProvider::parse_responses_sse_bytes(&mut parser, chunk2);
2546
2547 assert_eq!(second.len(), 4);
2548 match &second[0] {
2549 StreamChunk::ToolCallDelta {
2550 id,
2551 arguments_delta,
2552 } => {
2553 assert_eq!(id, "call_1");
2554 assert_eq!(arguments_delta, "{\"command\":");
2555 }
2556 other => panic!("expected first tool delta, got {other:?}"),
2557 }
2558 match &second[1] {
2559 StreamChunk::ToolCallDelta {
2560 id,
2561 arguments_delta,
2562 } => {
2563 assert_eq!(id, "call_1");
2564 assert_eq!(arguments_delta, "\"ls\"}");
2565 }
2566 other => panic!("expected second tool delta, got {other:?}"),
2567 }
2568 match &second[2] {
2569 StreamChunk::ToolCallEnd { id } => assert_eq!(id, "call_1"),
2570 other => panic!("expected tool end, got {other:?}"),
2571 }
2572 match &second[3] {
2573 StreamChunk::Done { usage } => {
2574 let usage = usage.as_ref().expect("expected usage");
2575 assert_eq!(usage.prompt_tokens, 11);
2576 assert_eq!(usage.completion_tokens, 7);
2577 assert_eq!(usage.total_tokens, 18);
2578 }
2579 other => panic!("expected done, got {other:?}"),
2580 }
2581 }
2582
2583 #[test]
2584 fn responses_sse_parser_falls_back_to_done_item_arguments() {
2585 let mut parser = ResponsesSseParser::default();
2586 let bytes = br#"data: {"type":"response.output_item.done","item":{"type":"function_call","id":"fc_2","call_id":"call_2","name":"read","arguments":"{\"path\":\"src/main.rs\"}"}}
2587
2588"#;
2589
2590 let chunks = OpenAiCodexProvider::parse_responses_sse_bytes(&mut parser, bytes);
2591 assert_eq!(chunks.len(), 3);
2592 match &chunks[0] {
2593 StreamChunk::ToolCallStart { id, name } => {
2594 assert_eq!(id, "call_2");
2595 assert_eq!(name, "read");
2596 }
2597 other => panic!("expected tool start, got {other:?}"),
2598 }
2599 match &chunks[1] {
2600 StreamChunk::ToolCallDelta {
2601 id,
2602 arguments_delta,
2603 } => {
2604 assert_eq!(id, "call_2");
2605 assert_eq!(arguments_delta, "{\"path\":\"src/main.rs\"}");
2606 }
2607 other => panic!("expected tool delta, got {other:?}"),
2608 }
2609 match &chunks[2] {
2610 StreamChunk::ToolCallEnd { id } => assert_eq!(id, "call_2"),
2611 other => panic!("expected tool end, got {other:?}"),
2612 }
2613 }
2614
2615 #[test]
2616 fn responses_sse_parser_flushes_final_event_without_trailing_blank_line() {
2617 let mut parser = ResponsesSseParser::default();
2618 let bytes = br#"data: {"type":"response.output_item.done","item":{"type":"function_call","id":"fc_3","call_id":"call_3","name":"read","arguments":"{\"path\":\"src/lib.rs\"}"}}"#;
2619
2620 let first = OpenAiCodexProvider::parse_responses_sse_bytes(&mut parser, bytes);
2621 assert!(first.is_empty());
2622
2623 let flushed = OpenAiCodexProvider::finish_responses_sse_parser(&mut parser);
2624 assert_eq!(flushed.len(), 3);
2625 match &flushed[0] {
2626 StreamChunk::ToolCallStart { id, name } => {
2627 assert_eq!(id, "call_3");
2628 assert_eq!(name, "read");
2629 }
2630 other => panic!("expected tool start, got {other:?}"),
2631 }
2632 match &flushed[1] {
2633 StreamChunk::ToolCallDelta {
2634 id,
2635 arguments_delta,
2636 } => {
2637 assert_eq!(id, "call_3");
2638 assert_eq!(arguments_delta, "{\"path\":\"src/lib.rs\"}");
2639 }
2640 other => panic!("expected tool delta, got {other:?}"),
2641 }
2642 match &flushed[2] {
2643 StreamChunk::ToolCallEnd { id } => assert_eq!(id, "call_3"),
2644 other => panic!("expected tool end, got {other:?}"),
2645 }
2646 }
2647
2648 #[tokio::test]
2649 async fn collect_stream_completion_updates_tool_name_after_early_delta() {
2650 let stream = stream::iter(vec![
2651 StreamChunk::ToolCallDelta {
2652 id: "call_4".to_string(),
2653 arguments_delta: "{\"path\":\"src/provider/openai_codex.rs\"}".to_string(),
2654 },
2655 StreamChunk::ToolCallStart {
2656 id: "call_4".to_string(),
2657 name: "read".to_string(),
2658 },
2659 StreamChunk::Done { usage: None },
2660 ]);
2661
2662 let response = OpenAiCodexProvider::collect_stream_completion(Box::pin(stream))
2663 .await
2664 .expect("stream completion should succeed");
2665
2666 assert!(matches!(
2667 response.message.content.first(),
2668 Some(ContentPart::ToolCall { id, name, arguments, .. })
2669 if id == "call_4"
2670 && name == "read"
2671 && arguments == "{\"path\":\"src/provider/openai_codex.rs\"}"
2672 ));
2673 }
2674}