1use super::{
9 CompletionRequest, CompletionResponse, ContentPart, FinishReason, Message, ModelInfo, Provider,
10 Role, StreamChunk, ToolDefinition, Usage,
11};
12use anyhow::{Context, Result};
13use async_trait::async_trait;
14use base64::{Engine as _, engine::general_purpose::URL_SAFE_NO_PAD};
15use futures::stream::BoxStream;
16use futures::{SinkExt, StreamExt};
17use reqwest::{Client, StatusCode};
18use serde::{Deserialize, Serialize};
19use serde_json::{Value, json};
20use sha2::{Digest, Sha256};
21use std::collections::HashMap;
22use std::io::ErrorKind;
23use std::sync::Arc;
24use tokio::io::{AsyncRead, AsyncWrite};
25use tokio::net::TcpStream;
26use tokio::sync::RwLock;
27use tokio_tungstenite::{
28 MaybeTlsStream, WebSocketStream, connect_async,
29 tungstenite::{
30 Error as WsError, Message as WsMessage,
31 client::IntoClientRequest,
32 http::{HeaderValue, Request},
33 },
34};
35
36const OPENAI_API_URL: &str = "https://api.openai.com/v1";
37const CHATGPT_CODEX_API_URL: &str = "https://chatgpt.com/backend-api/codex";
38const OPENAI_RESPONSES_WS_URL: &str = "wss://api.openai.com/v1/responses";
39const CHATGPT_CODEX_RESPONSES_WS_URL: &str = "wss://chatgpt.com/backend-api/codex/responses";
40const AUTH_ISSUER: &str = "https://auth.openai.com";
41const AUTHORIZE_URL: &str = "https://auth.openai.com/oauth/authorize";
42const TOKEN_URL: &str = "https://auth.openai.com/oauth/token";
43const CLIENT_ID: &str = "app_EMoamEEZ73f0CkXaXp7hrann";
44const REDIRECT_URI: &str = "http://localhost:1455/auth/callback";
45const SCOPE: &str = "openid profile email offline_access";
46const THINKING_LEVEL_ENV: &str = "CODETETHER_OPENAI_CODEX_THINKING_LEVEL";
47const REASONING_EFFORT_ENV: &str = "CODETETHER_OPENAI_CODEX_REASONING_EFFORT";
48const DEFAULT_RESPONSES_INSTRUCTIONS: &str = "You are CodeTether Agent running on OpenAI Codex. \
49Resolve software tasks directly: inspect the workspace, make focused changes, validate with \
50available tools, and report concise results. When model availability or external APIs are involved, \
51verify live behavior before treating it as supported.";
52
53#[derive(Debug, Clone, Copy, PartialEq, Eq)]
54enum ThinkingLevel {
55 NoReasoning,
56 Low,
57 Medium,
58 High,
59 XHigh,
60}
61
62#[derive(Debug, Clone, Copy, PartialEq, Eq)]
63enum ResponsesWsBackend {
64 OpenAi,
65 ChatGptCodex,
66}
67
68#[derive(Debug, Clone, Copy, PartialEq, Eq)]
69enum CodexServiceTier {
70 Priority,
71}
72
73impl ThinkingLevel {
74 fn parse(raw: &str) -> Option<Self> {
75 match raw.trim().to_ascii_lowercase().as_str() {
76 "none" => Some(Self::NoReasoning),
77 "low" => Some(Self::Low),
78 "medium" => Some(Self::Medium),
79 "high" => Some(Self::High),
80 "xhigh" => Some(Self::XHigh),
81 _ => None,
82 }
83 }
84
85 fn as_str(self) -> &'static str {
86 match self {
87 Self::NoReasoning => "none",
88 Self::Low => "low",
89 Self::Medium => "medium",
90 Self::High => "high",
91 Self::XHigh => "xhigh",
92 }
93 }
94}
95
96impl CodexServiceTier {
97 fn as_str(self) -> &'static str {
98 match self {
99 Self::Priority => "priority",
100 }
101 }
102}
103
104#[allow(dead_code)]
106struct CachedTokens {
107 access_token: String,
108 #[allow(dead_code)]
109 refresh_token: String,
110 expires_at: std::time::Instant,
111}
112
113#[derive(Debug, Clone, Serialize, Deserialize)]
115pub struct OAuthCredentials {
116 #[serde(default)]
117 pub id_token: Option<String>,
118 #[serde(default)]
119 pub chatgpt_account_id: Option<String>,
120 pub access_token: String,
121 pub refresh_token: String,
122 pub expires_at: u64, }
124
125struct PkcePair {
127 verifier: String,
128 challenge: String,
129}
130
131#[derive(Debug, Default)]
132struct ResponsesToolState {
133 call_id: String,
134 name: Option<String>,
135 started: bool,
136 finished: bool,
137 emitted_arguments: String,
138}
139
140#[derive(Debug, Default)]
141struct ResponsesSseParser {
142 line_buffer: String,
143 event_data_lines: Vec<String>,
144 tools: HashMap<String, ResponsesToolState>,
145}
146
147pub struct OpenAiRealtimeConnection<S = MaybeTlsStream<TcpStream>> {
148 stream: WebSocketStream<S>,
149}
150
151impl<S> std::fmt::Debug for OpenAiRealtimeConnection<S> {
152 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
153 f.debug_struct("OpenAiRealtimeConnection").finish()
154 }
155}
156
157impl<S> OpenAiRealtimeConnection<S>
158where
159 S: AsyncRead + AsyncWrite + Unpin,
160{
161 fn new(stream: WebSocketStream<S>) -> Self {
162 Self { stream }
163 }
164
165 pub async fn send_event(&mut self, event: &Value) -> Result<()> {
166 let payload = serde_json::to_string(event).context("Failed to serialize Realtime event")?;
167 self.stream
168 .send(WsMessage::Text(payload.into()))
169 .await
170 .context("Failed to send Realtime event")?;
171 Ok(())
172 }
173
174 pub async fn recv_event(&mut self) -> Result<Option<Value>> {
175 while let Some(message) = self.stream.next().await {
176 let message = message.context("Realtime WebSocket receive failed")?;
177 match message {
178 WsMessage::Text(text) => {
179 let event = serde_json::from_str(&text)
180 .context("Failed to parse Realtime text event")?;
181 return Ok(Some(event));
182 }
183 WsMessage::Binary(bytes) => {
184 let text = String::from_utf8(bytes.to_vec())
185 .context("Realtime binary event was not valid UTF-8")?;
186 let event = serde_json::from_str(&text)
187 .context("Failed to parse Realtime binary event")?;
188 return Ok(Some(event));
189 }
190 WsMessage::Ping(payload) => {
191 self.stream
192 .send(WsMessage::Pong(payload))
193 .await
194 .context("Failed to respond to Realtime ping")?;
195 }
196 WsMessage::Pong(_) => {}
197 WsMessage::Frame(_) => {}
198 WsMessage::Close(_) => return Ok(None),
199 }
200 }
201
202 Ok(None)
203 }
204
205 pub async fn close(&mut self) -> Result<()> {
206 match self.stream.send(WsMessage::Close(None)).await {
207 Ok(()) => {}
208 Err(WsError::ConnectionClosed) | Err(WsError::AlreadyClosed) => {}
209 Err(WsError::Io(err))
210 if matches!(
211 err.kind(),
212 ErrorKind::BrokenPipe
213 | ErrorKind::ConnectionReset
214 | ErrorKind::ConnectionAborted
215 | ErrorKind::NotConnected
216 ) => {}
217 Err(err) => return Err(err).context("Failed to close Realtime WebSocket"),
218 }
219 Ok(())
220 }
221}
222
223pub struct OpenAiCodexProvider {
224 client: Client,
225 cached_tokens: Arc<RwLock<Option<CachedTokens>>>,
226 static_api_key: Option<String>,
227 chatgpt_account_id: Option<String>,
228 stored_credentials: Option<OAuthCredentials>,
230}
231
232impl std::fmt::Debug for OpenAiCodexProvider {
233 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
234 f.debug_struct("OpenAiCodexProvider")
235 .field("has_api_key", &self.static_api_key.is_some())
236 .field("has_chatgpt_account_id", &self.chatgpt_account_id.is_some())
237 .field("has_credentials", &self.stored_credentials.is_some())
238 .finish()
239 }
240}
241
242impl Default for OpenAiCodexProvider {
243 fn default() -> Self {
244 Self::new()
245 }
246}
247
248impl OpenAiCodexProvider {
249 fn chatgpt_supported_models() -> &'static [&'static str] {
250 &[
251 "gpt-5",
252 "gpt-5-mini",
253 "gpt-5.1-codex",
254 "gpt-5.2",
255 "gpt-5.3-codex",
256 "gpt-5.4",
257 "gpt-5.4-fast",
258 "gpt-5.5",
259 "gpt-5.5-fast",
260 "o3",
261 "o4-mini",
262 ]
263 }
264
265 fn model_is_supported_by_backend(&self, model: &str) -> bool {
266 if !self.using_chatgpt_backend() {
267 return true;
268 }
269 Self::chatgpt_supported_models().contains(&model)
270 }
271
272 fn validate_model_for_backend(&self, model: &str) -> Result<()> {
273 let (resolved_model, _, _) =
274 Self::resolve_model_and_reasoning_effort_and_service_tier(model);
275 if self.model_is_supported_by_backend(model)
276 || self.model_is_supported_by_backend(&resolved_model)
277 {
278 return Ok(());
279 }
280
281 if self.using_chatgpt_backend() {
282 anyhow::bail!(
283 "Model '{}' is not supported when using Codex with a ChatGPT account. Supported models: {}",
284 model,
285 Self::chatgpt_supported_models().join(", ")
286 );
287 }
288
289 Ok(())
290 }
291
292 pub fn from_api_key(api_key: String) -> Self {
293 Self {
294 client: crate::provider::shared_http::shared_client().clone(),
295 cached_tokens: Arc::new(RwLock::new(None)),
296 static_api_key: Some(api_key),
297 chatgpt_account_id: None,
298 stored_credentials: None,
299 }
300 }
301
302 pub fn from_credentials(credentials: OAuthCredentials) -> Self {
304 let chatgpt_account_id = credentials
305 .chatgpt_account_id
306 .clone()
307 .or_else(|| {
308 credentials
309 .id_token
310 .as_deref()
311 .and_then(Self::extract_chatgpt_account_id_from_jwt)
312 })
313 .or_else(|| Self::extract_chatgpt_account_id_from_jwt(&credentials.access_token));
314
315 Self {
316 client: crate::provider::shared_http::shared_client().clone(),
317 cached_tokens: Arc::new(RwLock::new(None)),
318 static_api_key: None,
319 chatgpt_account_id,
320 stored_credentials: Some(credentials),
321 }
322 }
323
324 #[allow(dead_code)]
326 pub fn new() -> Self {
327 Self {
328 client: crate::provider::shared_http::shared_client().clone(),
329 cached_tokens: Arc::new(RwLock::new(None)),
330 static_api_key: None,
331 chatgpt_account_id: None,
332 stored_credentials: None,
333 }
334 }
335
336 fn generate_pkce() -> PkcePair {
338 let random_bytes: [u8; 32] = {
339 let timestamp = std::time::SystemTime::now()
340 .duration_since(std::time::UNIX_EPOCH)
341 .map(|d| d.as_nanos())
342 .unwrap_or(0);
343
344 let mut bytes = [0u8; 32];
345 let ts_bytes = timestamp.to_le_bytes();
346 let tid = std::thread::current().id();
347 let tid_repr = format!("{:?}", tid);
348 let tid_hash = Sha256::digest(tid_repr.as_bytes());
349
350 bytes[0..8].copy_from_slice(&ts_bytes[0..8]);
351 bytes[8..24].copy_from_slice(&tid_hash[0..16]);
352 bytes[24..].copy_from_slice(&Sha256::digest(ts_bytes)[0..8]);
353 bytes
354 };
355 let verifier = URL_SAFE_NO_PAD.encode(random_bytes);
356
357 let mut hasher = Sha256::new();
358 hasher.update(verifier.as_bytes());
359 let challenge_bytes = hasher.finalize();
360 let challenge = URL_SAFE_NO_PAD.encode(challenge_bytes);
361
362 PkcePair {
363 verifier,
364 challenge,
365 }
366 }
367
368 fn generate_state() -> String {
370 let timestamp = std::time::SystemTime::now()
371 .duration_since(std::time::UNIX_EPOCH)
372 .map(|d| d.as_nanos())
373 .unwrap_or(0);
374 let random: [u8; 8] = {
375 let ptr = Box::into_raw(Box::new(timestamp)) as usize;
376 let bytes = ptr.to_le_bytes();
377 let mut arr = [0u8; 8];
378 arr.copy_from_slice(&bytes);
379 arr
380 };
381 format!("{:016x}{:016x}", timestamp, u64::from_le_bytes(random))
382 }
383
384 #[allow(dead_code)]
386 pub fn get_authorization_url() -> (String, String, String) {
387 let pkce = Self::generate_pkce();
388 let state = Self::generate_state();
389
390 let url = format!(
391 "{}?response_type=code&client_id={}&redirect_uri={}&scope={}&code_challenge={}&code_challenge_method=S256&state={}&id_token_add_organizations=true&codex_cli_simplified_flow=true&originator=codex_cli_rs",
392 AUTHORIZE_URL,
393 CLIENT_ID,
394 urlencoding::encode(REDIRECT_URI),
395 urlencoding::encode(SCOPE),
396 pkce.challenge,
397 state
398 );
399
400 (url, pkce.verifier, state)
401 }
402
403 pub fn oauth_issuer() -> &'static str {
404 AUTH_ISSUER
405 }
406
407 pub fn oauth_client_id() -> &'static str {
408 CLIENT_ID
409 }
410
411 fn responses_ws_url_with_base(base_url: &str) -> String {
412 base_url.to_string()
413 }
414
415 pub fn responses_ws_url() -> String {
416 Self::responses_ws_url_with_base(OPENAI_RESPONSES_WS_URL)
417 }
418
419 fn build_responses_ws_request_with_base_url_and_account_id(
420 base_url: &str,
421 token: &str,
422 chatgpt_account_id: Option<&str>,
423 ) -> Result<Request<()>> {
424 if token.trim().is_empty() {
425 anyhow::bail!("Responses WebSocket token cannot be empty");
426 }
427
428 let url = Self::responses_ws_url_with_base(base_url);
429 let mut request = url
430 .into_client_request()
431 .context("Failed to build Responses WebSocket request")?;
432 request.headers_mut().insert(
433 "Authorization",
434 HeaderValue::from_str(&format!("Bearer {token}"))
435 .context("Failed to build Responses Authorization header")?,
436 );
437 request.headers_mut().insert(
438 "User-Agent",
439 HeaderValue::from_static("codetether-responses-ws/1.0"),
440 );
441 if let Some(account_id) = chatgpt_account_id.filter(|id| !id.trim().is_empty()) {
442 request.headers_mut().insert(
443 "ChatGPT-Account-ID",
444 HeaderValue::from_str(account_id)
445 .context("Failed to build ChatGPT account header")?,
446 );
447 }
448 Ok(request)
449 }
450
451 fn build_responses_ws_request_with_base_url(
452 base_url: &str,
453 token: &str,
454 ) -> Result<Request<()>> {
455 Self::build_responses_ws_request_with_base_url_and_account_id(base_url, token, None)
456 }
457
458 fn build_responses_ws_request_with_account_id(
459 token: &str,
460 chatgpt_account_id: Option<&str>,
461 ) -> Result<Request<()>> {
462 Self::build_responses_ws_request_with_base_url_and_account_id(
463 OPENAI_RESPONSES_WS_URL,
464 token,
465 chatgpt_account_id,
466 )
467 }
468
469 fn build_chatgpt_responses_ws_request(
470 token: &str,
471 chatgpt_account_id: Option<&str>,
472 ) -> Result<Request<()>> {
473 Self::build_responses_ws_request_with_base_url_and_account_id(
474 CHATGPT_CODEX_RESPONSES_WS_URL,
475 token,
476 chatgpt_account_id,
477 )
478 }
479
480 async fn connect_responses_ws_with_token(
481 &self,
482 token: &str,
483 chatgpt_account_id: Option<&str>,
484 backend: ResponsesWsBackend,
485 ) -> Result<OpenAiRealtimeConnection> {
486 let request = match backend {
487 ResponsesWsBackend::OpenAi => {
488 Self::build_responses_ws_request_with_account_id(token, chatgpt_account_id)?
489 }
490 ResponsesWsBackend::ChatGptCodex => {
491 Self::build_chatgpt_responses_ws_request(token, chatgpt_account_id)?
492 }
493 };
494 let (stream, _response) = connect_async(request)
495 .await
496 .context("Failed to connect to OpenAI Responses WebSocket")?;
497 Ok(OpenAiRealtimeConnection::new(stream))
498 }
499
500 pub async fn connect_responses_ws(&self) -> Result<OpenAiRealtimeConnection> {
501 let token = self.get_access_token().await?;
502 let account_id = self.resolved_chatgpt_account_id(&token);
503 let backend = if self.using_chatgpt_backend() {
504 ResponsesWsBackend::ChatGptCodex
505 } else {
506 ResponsesWsBackend::OpenAi
507 };
508 self.connect_responses_ws_with_token(&token, account_id.as_deref(), backend)
509 .await
510 }
511
512 #[allow(dead_code)]
514 pub async fn exchange_code(code: &str, verifier: &str) -> Result<OAuthCredentials> {
515 Self::exchange_code_with_redirect_uri(code, verifier, REDIRECT_URI).await
516 }
517
518 pub async fn exchange_code_with_redirect_uri(
520 code: &str,
521 verifier: &str,
522 redirect_uri: &str,
523 ) -> Result<OAuthCredentials> {
524 let client = crate::provider::shared_http::shared_client().clone();
525 let form_body = format!(
526 "grant_type={}&client_id={}&code={}&code_verifier={}&redirect_uri={}",
527 urlencoding::encode("authorization_code"),
528 CLIENT_ID,
529 urlencoding::encode(code),
530 urlencoding::encode(verifier),
531 urlencoding::encode(redirect_uri),
532 );
533
534 let response = client
535 .post(TOKEN_URL)
536 .header("Content-Type", "application/x-www-form-urlencoded")
537 .body(form_body)
538 .send()
539 .await
540 .context("Failed to exchange authorization code")?;
541
542 if !response.status().is_success() {
543 let body = response.text().await.unwrap_or_default();
544 anyhow::bail!("OAuth token exchange failed: {}", body);
545 }
546
547 #[derive(Deserialize)]
548 struct TokenResponse {
549 #[serde(default)]
550 id_token: Option<String>,
551 access_token: String,
552 refresh_token: String,
553 expires_in: u64,
554 }
555
556 let tokens: TokenResponse = response
557 .json()
558 .await
559 .context("Failed to parse token response")?;
560
561 let expires_at = std::time::SystemTime::now()
562 .duration_since(std::time::UNIX_EPOCH)
563 .context("System time error")?
564 .as_secs()
565 + tokens.expires_in;
566
567 let chatgpt_account_id = tokens
568 .id_token
569 .as_deref()
570 .and_then(Self::extract_chatgpt_account_id_from_jwt);
571
572 Ok(OAuthCredentials {
573 id_token: tokens.id_token,
574 chatgpt_account_id,
575 access_token: tokens.access_token,
576 refresh_token: tokens.refresh_token,
577 expires_at,
578 })
579 }
580
581 pub async fn exchange_id_token_for_api_key(id_token: &str) -> Result<String> {
582 #[derive(Deserialize)]
583 struct ExchangeResponse {
584 access_token: String,
585 }
586
587 let client = crate::provider::shared_http::shared_client().clone();
588 let form_body = format!(
589 "grant_type={}&client_id={}&requested_token={}&subject_token={}&subject_token_type={}",
590 urlencoding::encode("urn:ietf:params:oauth:grant-type:token-exchange"),
591 urlencoding::encode(CLIENT_ID),
592 urlencoding::encode("openai-api-key"),
593 urlencoding::encode(id_token),
594 urlencoding::encode("urn:ietf:params:oauth:token-type:id_token"),
595 );
596
597 let response = client
598 .post(TOKEN_URL)
599 .header("Content-Type", "application/x-www-form-urlencoded")
600 .body(form_body)
601 .send()
602 .await
603 .context("Failed to exchange id_token for OpenAI API key")?;
604
605 let status = response.status();
606 if !status.is_success() {
607 let body = response.text().await.unwrap_or_default();
608 anyhow::bail!("API key token exchange failed ({}): {}", status, body);
609 }
610
611 let payload: ExchangeResponse = response
612 .json()
613 .await
614 .context("Failed to parse API key token exchange response")?;
615 if payload.access_token.trim().is_empty() {
616 anyhow::bail!("API key token exchange returned an empty access token");
617 }
618 Ok(payload.access_token)
619 }
620
621 async fn refresh_access_token(&self, refresh_token: &str) -> Result<OAuthCredentials> {
623 let form_body = format!(
624 "grant_type={}&refresh_token={}&client_id={}",
625 urlencoding::encode("refresh_token"),
626 urlencoding::encode(refresh_token),
627 CLIENT_ID,
628 );
629
630 let response = self
631 .client
632 .post(TOKEN_URL)
633 .header("Content-Type", "application/x-www-form-urlencoded")
634 .body(form_body)
635 .send()
636 .await
637 .context("Failed to refresh access token")?;
638
639 if !response.status().is_success() {
640 let body = response.text().await.unwrap_or_default();
641 anyhow::bail!("Token refresh failed: {}", body);
642 }
643
644 #[derive(Deserialize)]
645 struct TokenResponse {
646 access_token: String,
647 refresh_token: String,
648 expires_in: u64,
649 }
650
651 let tokens: TokenResponse = response
652 .json()
653 .await
654 .context("Failed to parse refresh response")?;
655
656 let expires_at = std::time::SystemTime::now()
657 .duration_since(std::time::UNIX_EPOCH)
658 .context("System time error")?
659 .as_secs()
660 + tokens.expires_in;
661
662 Ok(OAuthCredentials {
663 id_token: None,
664 chatgpt_account_id: self.chatgpt_account_id.clone(),
665 access_token: tokens.access_token,
666 refresh_token: tokens.refresh_token,
667 expires_at,
668 })
669 }
670
671 async fn get_access_token(&self) -> Result<String> {
673 if let Some(ref api_key) = self.static_api_key {
674 return Ok(api_key.clone());
675 }
676
677 {
678 let cache = self.cached_tokens.read().await;
679 if let Some(ref tokens) = *cache
680 && tokens
681 .expires_at
682 .duration_since(std::time::Instant::now())
683 .as_secs()
684 > 300
685 {
686 return Ok(tokens.access_token.clone());
687 }
688 }
689
690 let mut cache = self.cached_tokens.write().await;
691
692 let creds = if let Some(ref stored) = self.stored_credentials {
693 let now = std::time::SystemTime::now()
694 .duration_since(std::time::UNIX_EPOCH)
695 .context("System time error")?
696 .as_secs();
697
698 if stored.expires_at > now + 300 {
699 stored.clone()
700 } else {
701 self.refresh_access_token(&stored.refresh_token).await?
702 }
703 } else {
704 anyhow::bail!("No OAuth credentials available. Run OAuth flow first.");
705 };
706
707 let expires_in = creds.expires_at
708 - std::time::SystemTime::now()
709 .duration_since(std::time::UNIX_EPOCH)
710 .context("System time error")?
711 .as_secs();
712
713 let cached = CachedTokens {
714 access_token: creds.access_token.clone(),
715 refresh_token: creds.refresh_token.clone(),
716 expires_at: std::time::Instant::now() + std::time::Duration::from_secs(expires_in),
717 };
718
719 let token = cached.access_token.clone();
720 *cache = Some(cached);
721 Ok(token)
722 }
723
724 #[allow(dead_code)]
725 fn convert_messages(messages: &[Message]) -> Vec<Value> {
726 messages
727 .iter()
728 .map(|msg| {
729 let role = match msg.role {
730 Role::System => "system",
731 Role::User => "user",
732 Role::Assistant => "assistant",
733 Role::Tool => "tool",
734 };
735
736 match msg.role {
737 Role::Tool => {
738 if let Some(ContentPart::ToolResult {
739 tool_call_id,
740 content,
741 }) = msg.content.first()
742 {
743 json!({
744 "role": "tool",
745 "tool_call_id": tool_call_id,
746 "content": content
747 })
748 } else {
749 json!({ "role": role, "content": "" })
750 }
751 }
752 Role::Assistant => {
753 let text: String = msg
754 .content
755 .iter()
756 .filter_map(|p| match p {
757 ContentPart::Text { text } => Some(text.clone()),
758 _ => None,
759 })
760 .collect::<Vec<_>>()
761 .join("");
762
763 let tool_calls: Vec<Value> = msg
764 .content
765 .iter()
766 .filter_map(|p| match p {
767 ContentPart::ToolCall {
768 id,
769 name,
770 arguments,
771 ..
772 } => Some(json!({
773 "id": id,
774 "type": "function",
775 "function": {
776 "name": name,
777 "arguments": arguments
778 }
779 })),
780 _ => None,
781 })
782 .collect();
783
784 if tool_calls.is_empty() {
785 json!({ "role": "assistant", "content": text })
786 } else {
787 json!({
788 "role": "assistant",
789 "content": if text.is_empty() { Value::Null } else { json!(text) },
790 "tool_calls": tool_calls
791 })
792 }
793 }
794 _ => {
795 let text: String = msg
796 .content
797 .iter()
798 .filter_map(|p| match p {
799 ContentPart::Text { text } => Some(text.clone()),
800 _ => None,
801 })
802 .collect::<Vec<_>>()
803 .join("\n");
804 json!({ "role": role, "content": text })
805 }
806 }
807 })
808 .collect()
809 }
810
811 #[allow(dead_code)]
812 fn convert_tools(tools: &[ToolDefinition]) -> Vec<Value> {
813 tools
814 .iter()
815 .map(|t| {
816 json!({
817 "type": "function",
818 "function": {
819 "name": t.name,
820 "description": t.description,
821 "parameters": t.parameters
822 }
823 })
824 })
825 .collect()
826 }
827
828 fn convert_responses_tools(tools: &[ToolDefinition]) -> Vec<Value> {
829 tools
830 .iter()
831 .map(|t| {
832 json!({
833 "type": "function",
834 "name": t.name,
835 "description": t.description,
836 "strict": false,
837 "parameters": t.parameters
838 })
839 })
840 .collect()
841 }
842
843 fn extract_responses_instructions(messages: &[Message]) -> String {
844 let instructions = messages
845 .iter()
846 .filter(|msg| matches!(msg.role, Role::System))
847 .filter_map(|msg| {
848 let text = msg
849 .content
850 .iter()
851 .filter_map(|p| match p {
852 ContentPart::Text { text } => Some(text.clone()),
853 _ => None,
854 })
855 .collect::<Vec<_>>()
856 .join("\n");
857 (!text.is_empty()).then_some(text)
858 })
859 .collect::<Vec<_>>()
860 .join("\n\n");
861
862 if instructions.is_empty() {
863 DEFAULT_RESPONSES_INSTRUCTIONS.to_string()
864 } else {
865 instructions
866 }
867 }
868
869 fn convert_messages_to_responses_input(messages: &[Message]) -> Vec<Value> {
870 let mut input = Vec::new();
871 let mut known_tool_call_ids = std::collections::HashSet::new();
872
873 for msg in messages {
874 match msg.role {
875 Role::System => {}
876 Role::User => {
877 let text: String = msg
878 .content
879 .iter()
880 .filter_map(|p| match p {
881 ContentPart::Text { text } => Some(text.clone()),
882 _ => None,
883 })
884 .collect::<Vec<_>>()
885 .join("\n");
886 if !text.is_empty() {
887 input.push(json!({
888 "type": "message",
889 "role": "user",
890 "content": [{
891 "type": "input_text",
892 "text": text
893 }]
894 }));
895 }
896 }
897 Role::Assistant => {
898 let text: String = msg
899 .content
900 .iter()
901 .filter_map(|p| match p {
902 ContentPart::Text { text } => Some(text.clone()),
903 _ => None,
904 })
905 .collect::<Vec<_>>()
906 .join("");
907
908 if !text.is_empty() {
909 input.push(json!({
910 "type": "message",
911 "role": "assistant",
912 "content": [{
913 "type": "output_text",
914 "text": text
915 }]
916 }));
917 }
918
919 for part in &msg.content {
920 if let ContentPart::ToolCall {
921 id,
922 name,
923 arguments,
924 ..
925 } = part
926 {
927 known_tool_call_ids.insert(id.clone());
928 input.push(json!({
929 "type": "function_call",
930 "call_id": id,
931 "name": name,
932 "arguments": arguments,
933 }));
934 }
935 }
936 }
937 Role::Tool => {
938 for part in &msg.content {
939 if let ContentPart::ToolResult {
940 tool_call_id,
941 content,
942 } = part
943 {
944 if known_tool_call_ids.contains(tool_call_id) {
945 input.push(json!({
946 "type": "function_call_output",
947 "call_id": tool_call_id,
948 "output": content,
949 }));
950 } else {
951 tracing::warn!(
952 tool_call_id = %tool_call_id,
953 "Skipping orphaned function_call_output while building Codex responses input"
954 );
955 }
956 }
957 }
958 }
959 }
960 }
961
962 input
963 }
964
965 fn using_chatgpt_backend(&self) -> bool {
966 self.static_api_key.is_none()
967 }
968
969 fn extract_chatgpt_account_id_from_jwt(jwt: &str) -> Option<String> {
970 let payload_b64 = jwt.split('.').nth(1)?;
971 let payload = URL_SAFE_NO_PAD.decode(payload_b64).ok()?;
972 let value: Value = serde_json::from_slice(&payload).ok()?;
973
974 value
975 .get("https://api.openai.com/auth")
976 .and_then(Value::as_object)
977 .and_then(|auth| auth.get("chatgpt_account_id"))
978 .and_then(Value::as_str)
979 .map(|s| s.to_string())
980 .or_else(|| {
981 value
982 .get("chatgpt_account_id")
983 .and_then(Value::as_str)
984 .map(|s| s.to_string())
985 })
986 .or_else(|| {
987 value
988 .get("organizations")
989 .and_then(Value::as_array)
990 .and_then(|orgs| orgs.first())
991 .and_then(|org| org.get("id"))
992 .and_then(Value::as_str)
993 .map(|s| s.to_string())
994 })
995 }
996
997 pub fn extract_chatgpt_account_id(token: &str) -> Option<String> {
998 Self::extract_chatgpt_account_id_from_jwt(token)
999 }
1000
1001 fn resolved_chatgpt_account_id(&self, access_token: &str) -> Option<String> {
1002 self.chatgpt_account_id
1003 .clone()
1004 .or_else(|| Self::extract_chatgpt_account_id_from_jwt(access_token))
1005 .or_else(|| {
1006 self.stored_credentials.as_ref().and_then(|creds| {
1007 creds
1008 .id_token
1009 .as_deref()
1010 .and_then(Self::extract_chatgpt_account_id_from_jwt)
1011 .or_else(|| creds.chatgpt_account_id.clone())
1012 })
1013 })
1014 }
1015
1016 fn parse_responses_usage(usage: Option<&Value>) -> Usage {
1017 let Some(usage) = usage else {
1018 return Usage::default();
1019 };
1020
1021 let prompt_tokens = usage
1022 .get("prompt_tokens")
1023 .or_else(|| usage.get("input_tokens"))
1024 .and_then(Value::as_u64)
1025 .unwrap_or(0) as usize;
1026
1027 let completion_tokens = usage
1028 .get("completion_tokens")
1029 .or_else(|| usage.get("output_tokens"))
1030 .and_then(Value::as_u64)
1031 .unwrap_or(0) as usize;
1032
1033 let total_tokens = usage
1034 .get("total_tokens")
1035 .and_then(Value::as_u64)
1036 .unwrap_or((prompt_tokens + completion_tokens) as u64)
1037 as usize;
1038
1039 let cached_tokens = usage
1046 .get("prompt_tokens_details")
1047 .or_else(|| usage.get("input_tokens_details"))
1048 .and_then(|d| d.get("cached_tokens"))
1049 .and_then(Value::as_u64)
1050 .unwrap_or(0) as usize;
1051
1052 Usage {
1053 prompt_tokens: prompt_tokens.saturating_sub(cached_tokens),
1054 completion_tokens,
1055 total_tokens,
1056 cache_read_tokens: if cached_tokens > 0 {
1057 Some(cached_tokens)
1058 } else {
1059 None
1060 },
1061 cache_write_tokens: None,
1062 }
1063 }
1064
1065 fn extract_json_string(value: Option<&Value>) -> Option<String> {
1066 match value {
1067 Some(Value::String(text)) => Some(text.clone()),
1068 Some(other) => serde_json::to_string(other).ok(),
1069 None => None,
1070 }
1071 }
1072
1073 #[allow(dead_code)]
1074 fn parse_responses_output_parts(output: &[Value]) -> (Vec<ContentPart>, bool) {
1075 let mut parts = Vec::new();
1076 let mut has_tool_calls = false;
1077
1078 for item in output {
1079 match item.get("type").and_then(Value::as_str) {
1080 Some("message") => {
1081 if let Some(content) = item.get("content").and_then(Value::as_array) {
1082 let text = content
1083 .iter()
1084 .filter(|segment| {
1085 matches!(
1086 segment.get("type").and_then(Value::as_str),
1087 Some("output_text") | Some("text")
1088 )
1089 })
1090 .filter_map(|segment| {
1091 segment
1092 .get("text")
1093 .and_then(Value::as_str)
1094 .map(str::to_string)
1095 })
1096 .collect::<Vec<_>>()
1097 .join("");
1098 if !text.is_empty() {
1099 parts.push(ContentPart::Text { text });
1100 }
1101 }
1102 }
1103 Some("function_call") => {
1104 let call_id = item
1105 .get("call_id")
1106 .or_else(|| item.get("id"))
1107 .and_then(Value::as_str);
1108 let name = item.get("name").and_then(Value::as_str);
1109 let arguments = item.get("arguments").and_then(Value::as_str);
1110 if let (Some(call_id), Some(name), Some(arguments)) = (call_id, name, arguments)
1111 {
1112 has_tool_calls = true;
1113 parts.push(ContentPart::ToolCall {
1114 id: call_id.to_string(),
1115 name: name.to_string(),
1116 arguments: arguments.to_string(),
1117 thought_signature: None,
1118 });
1119 }
1120 }
1121 _ => {}
1122 }
1123 }
1124
1125 (parts, has_tool_calls)
1126 }
1127
1128 fn parse_model_thinking_level(model: &str) -> (String, Option<ThinkingLevel>) {
1129 let Some((base_model, level_str)) = model.rsplit_once(':') else {
1130 return (model.to_string(), None);
1131 };
1132 let Some(level) = ThinkingLevel::parse(level_str) else {
1133 return (model.to_string(), None);
1134 };
1135 if base_model.trim().is_empty() {
1136 return (model.to_string(), None);
1137 }
1138 (base_model.to_string(), Some(level))
1139 }
1140
1141 fn env_thinking_level() -> Option<ThinkingLevel> {
1142 std::env::var(THINKING_LEVEL_ENV)
1143 .ok()
1144 .or_else(|| std::env::var(REASONING_EFFORT_ENV).ok())
1145 .as_deref()
1146 .and_then(ThinkingLevel::parse)
1147 }
1148
1149 fn model_supports_reasoning_effort(model: &str) -> bool {
1150 let normalized = model.to_ascii_lowercase();
1151 normalized.starts_with("gpt-5")
1152 || normalized.starts_with("o1")
1153 || normalized.starts_with("o3")
1154 || normalized.starts_with("o4")
1155 }
1156
1157 fn parse_service_tier_model_alias(model: &str) -> (String, Option<CodexServiceTier>) {
1158 match model {
1159 "gpt-5.4-fast" => ("gpt-5.4".to_string(), Some(CodexServiceTier::Priority)),
1161 "gpt-5.5-fast" => ("gpt-5.5".to_string(), Some(CodexServiceTier::Priority)),
1162 _ => (model.to_string(), None),
1163 }
1164 }
1165
1166 fn resolve_model_and_reasoning_effort(model: &str) -> (String, Option<ThinkingLevel>) {
1167 let (base_model, level, _) =
1168 Self::resolve_model_and_reasoning_effort_and_service_tier(model);
1169 (base_model, level)
1170 }
1171
1172 fn resolve_model_and_reasoning_effort_and_service_tier(
1173 model: &str,
1174 ) -> (String, Option<ThinkingLevel>, Option<CodexServiceTier>) {
1175 let (base_model, level_from_model) = Self::parse_model_thinking_level(model);
1176 let level = level_from_model.or_else(Self::env_thinking_level);
1177 let (base_model, service_tier) = Self::parse_service_tier_model_alias(&base_model);
1178 if !Self::model_supports_reasoning_effort(&base_model) {
1179 return (base_model, None, service_tier);
1180 }
1181 (base_model, level, service_tier)
1182 }
1183
1184 fn apply_service_tier(payload: &mut Value, service_tier: Option<CodexServiceTier>) {
1185 if let Some(service_tier) = service_tier {
1186 payload["service_tier"] = json!(service_tier.as_str());
1187 }
1188 }
1189
1190 fn model_info(
1191 id: &str,
1192 name: &str,
1193 context_window: usize,
1194 max_output_tokens: usize,
1195 supports_vision: bool,
1196 ) -> ModelInfo {
1197 ModelInfo {
1198 id: id.to_string(),
1199 name: name.to_string(),
1200 provider: "openai-codex".to_string(),
1201 context_window,
1202 max_output_tokens: Some(max_output_tokens),
1203 supports_vision,
1204 supports_tools: true,
1205 supports_streaming: true,
1206 input_cost_per_million: Some(0.0),
1207 output_cost_per_million: Some(0.0),
1208 }
1209 }
1210
1211 fn api_key_hidden_model(model: &str) -> bool {
1212 matches!(model, "gpt-5.5" | "gpt-5.5-fast")
1213 }
1214
1215 fn format_openai_api_error(status: StatusCode, body: &str, model: &str) -> String {
1216 if status == StatusCode::UNAUTHORIZED && body.contains("Missing scopes: model.request") {
1217 return format!(
1218 "OpenAI Codex OAuth token is missing required scope `model.request` for model `{model}`.\n\
1219 Re-run `codetether auth codex` and complete OAuth approval with a ChatGPT subscription account \
1220 that has model access in your org/project."
1221 );
1222 }
1223 if status == StatusCode::UNAUTHORIZED
1224 && (body.contains("chatgpt-account-id")
1225 || body.contains("ChatGPT-Account-ID")
1226 || body.contains("workspace"))
1227 {
1228 return "OpenAI Codex auth is missing a ChatGPT workspace/account identifier.\n\
1229 Re-run `codetether auth codex --device-code` and sign in to the intended workspace."
1230 .to_string();
1231 }
1232 format!("OpenAI API error ({status}): {body}")
1233 }
1234
1235 fn build_responses_ws_create_event(
1236 request: &CompletionRequest,
1237 model: &str,
1238 reasoning_effort: Option<ThinkingLevel>,
1239 service_tier: Option<CodexServiceTier>,
1240 ) -> Value {
1241 Self::build_responses_ws_create_event_for_backend(
1242 request,
1243 model,
1244 reasoning_effort,
1245 service_tier,
1246 ResponsesWsBackend::OpenAi,
1247 )
1248 }
1249
1250 fn build_responses_ws_create_event_for_backend(
1251 request: &CompletionRequest,
1252 model: &str,
1253 reasoning_effort: Option<ThinkingLevel>,
1254 service_tier: Option<CodexServiceTier>,
1255 backend: ResponsesWsBackend,
1256 ) -> Value {
1257 let instructions = Self::extract_responses_instructions(&request.messages);
1258 let input = Self::convert_messages_to_responses_input(&request.messages);
1259 let tools = Self::convert_responses_tools(&request.tools);
1260
1261 let mut event = json!({
1262 "type": "response.create",
1263 "model": model,
1264 "store": false,
1265 "instructions": instructions,
1266 "input": input,
1267 });
1268
1269 if !tools.is_empty() {
1270 event["tools"] = json!(tools);
1271 }
1272 if let Some(level) = reasoning_effort {
1273 event["reasoning"] = json!({ "effort": level.as_str() });
1274 }
1275 Self::apply_service_tier(&mut event, service_tier);
1276 if backend == ResponsesWsBackend::OpenAi {
1277 event["tool_choice"] = json!("auto");
1278 event["parallel_tool_calls"] = json!(true);
1279 }
1280 if backend == ResponsesWsBackend::OpenAi
1281 && let Some(max_tokens) = request.max_tokens
1282 {
1283 event["max_output_tokens"] = json!(max_tokens);
1284 }
1285
1286 event
1287 }
1288
1289 fn finish_responses_tool_call(
1290 parser: &mut ResponsesSseParser,
1291 key: &str,
1292 chunks: &mut Vec<StreamChunk>,
1293 ) {
1294 if let Some(state) = parser.tools.get_mut(key)
1295 && !state.finished
1296 {
1297 chunks.push(StreamChunk::ToolCallEnd {
1298 id: state.call_id.clone(),
1299 });
1300 state.finished = true;
1301 }
1302 }
1303
1304 fn parse_responses_event(
1305 parser: &mut ResponsesSseParser,
1306 event: &Value,
1307 chunks: &mut Vec<StreamChunk>,
1308 ) {
1309 match event.get("type").and_then(Value::as_str) {
1310 Some("response.output_text.delta") => {
1311 if let Some(delta) = event.get("delta").and_then(Value::as_str) {
1312 chunks.push(StreamChunk::Text(delta.to_string()));
1313 }
1314 }
1315 Some("response.output_item.added") => {
1316 if let Some(item) = event.get("item") {
1317 Self::record_responses_tool_item(parser, item, chunks, false);
1318 }
1319 }
1320 Some("response.function_call_arguments.delta") => {
1321 Self::record_responses_tool_arguments(parser, &event, chunks, false);
1322 }
1323 Some("response.function_call_arguments.done") => {
1324 Self::record_responses_tool_arguments(parser, &event, chunks, true);
1325 }
1326 Some("response.output_item.done") => {
1327 if let Some(item) = event.get("item")
1328 && item.get("type").and_then(Value::as_str) == Some("function_call")
1329 && let Some(key) = item
1330 .get("id")
1331 .and_then(Value::as_str)
1332 .or_else(|| item.get("call_id").and_then(Value::as_str))
1333 {
1334 Self::record_responses_tool_item(parser, item, chunks, true);
1335 Self::finish_responses_tool_call(parser, key, chunks);
1336 }
1337 }
1338 Some("response.completed") | Some("response.done") => {
1339 if let Some(output) = event
1340 .get("response")
1341 .and_then(|response| response.get("output"))
1342 .and_then(Value::as_array)
1343 {
1344 for item in output {
1345 if item.get("type").and_then(Value::as_str) == Some("function_call")
1346 && let Some(key) = item
1347 .get("id")
1348 .and_then(Value::as_str)
1349 .or_else(|| item.get("call_id").and_then(Value::as_str))
1350 {
1351 Self::record_responses_tool_item(parser, item, chunks, true);
1352 Self::finish_responses_tool_call(parser, key, chunks);
1353 }
1354 }
1355 }
1356 let failed_message = event
1357 .get("response")
1358 .and_then(|response| response.get("status"))
1359 .and_then(Value::as_str)
1360 .filter(|status| matches!(*status, "failed" | "cancelled" | "incomplete"))
1361 .map(|_| {
1362 event
1363 .get("response")
1364 .and_then(|response| response.get("error"))
1365 .and_then(|error| error.get("message"))
1366 .and_then(Value::as_str)
1367 .unwrap_or("Response failed")
1368 .to_string()
1369 });
1370 if let Some(message) = failed_message {
1371 chunks.push(StreamChunk::Error(message));
1372 return;
1373 }
1374 let usage = event
1375 .get("response")
1376 .and_then(|response| response.get("usage"))
1377 .map(|usage| Self::parse_responses_usage(Some(usage)));
1378 chunks.push(StreamChunk::Done { usage });
1379 }
1380 Some("response.failed") => {
1381 let message = event
1382 .get("response")
1383 .and_then(|response| response.get("error"))
1384 .and_then(|error| error.get("message"))
1385 .or_else(|| event.get("error").and_then(|error| error.get("message")))
1386 .and_then(Value::as_str)
1387 .unwrap_or("Response failed")
1388 .to_string();
1389 chunks.push(StreamChunk::Error(message));
1390 }
1391 Some("error") => {
1392 let message = event
1393 .get("error")
1394 .and_then(|error| error.get("message"))
1395 .or_else(|| event.get("message"))
1396 .and_then(Value::as_str)
1397 .unwrap_or("Realtime error")
1398 .to_string();
1399 chunks.push(StreamChunk::Error(message));
1400 }
1401 _ => {}
1402 }
1403 }
1404
1405 fn parse_responses_sse_event(
1406 parser: &mut ResponsesSseParser,
1407 data: &str,
1408 chunks: &mut Vec<StreamChunk>,
1409 ) {
1410 if data == "[DONE]" {
1411 chunks.push(StreamChunk::Done { usage: None });
1412 return;
1413 }
1414
1415 let Ok(event): Result<Value, _> = serde_json::from_str(data) else {
1416 return;
1417 };
1418
1419 Self::parse_responses_event(parser, &event, chunks);
1420 }
1421
1422 fn record_responses_tool_item(
1423 parser: &mut ResponsesSseParser,
1424 item: &Value,
1425 chunks: &mut Vec<StreamChunk>,
1426 include_arguments: bool,
1427 ) -> Option<String> {
1428 if item.get("type").and_then(Value::as_str) != Some("function_call") {
1429 return None;
1430 }
1431
1432 let key = item
1433 .get("id")
1434 .and_then(Value::as_str)
1435 .or_else(|| item.get("call_id").and_then(Value::as_str))?;
1436 let call_id = item
1437 .get("call_id")
1438 .and_then(Value::as_str)
1439 .or_else(|| item.get("id").and_then(Value::as_str))?
1440 .to_string();
1441 let name = item.get("name").and_then(Value::as_str).map(str::to_string);
1442 let arguments = if include_arguments {
1443 Self::extract_json_string(item.get("arguments"))
1444 } else {
1445 None
1446 };
1447
1448 let state = parser
1449 .tools
1450 .entry(key.to_string())
1451 .or_insert_with(|| ResponsesToolState {
1452 call_id: call_id.clone(),
1453 ..ResponsesToolState::default()
1454 });
1455 if state.call_id.is_empty() {
1456 state.call_id = call_id.clone();
1457 }
1458 if state.name.is_none() {
1459 state.name = name;
1460 }
1461 if !state.started
1462 && let Some(name) = state.name.clone()
1463 {
1464 chunks.push(StreamChunk::ToolCallStart {
1465 id: state.call_id.clone(),
1466 name,
1467 });
1468 state.started = true;
1469 }
1470 if let Some(arguments) = arguments {
1471 Self::emit_missing_responses_tool_arguments(state, arguments, chunks);
1472 }
1473
1474 Some(state.call_id.clone())
1475 }
1476
1477 fn record_responses_tool_arguments(
1478 parser: &mut ResponsesSseParser,
1479 event: &Value,
1480 chunks: &mut Vec<StreamChunk>,
1481 use_final_arguments: bool,
1482 ) {
1483 let key = event
1484 .get("item_id")
1485 .and_then(Value::as_str)
1486 .or_else(|| event.get("call_id").and_then(Value::as_str));
1487 let Some(key) = key else {
1488 return;
1489 };
1490
1491 let fallback_call_id = event
1492 .get("call_id")
1493 .and_then(Value::as_str)
1494 .unwrap_or(key)
1495 .to_string();
1496 let state = parser
1497 .tools
1498 .entry(key.to_string())
1499 .or_insert_with(|| ResponsesToolState {
1500 call_id: fallback_call_id.clone(),
1501 ..ResponsesToolState::default()
1502 });
1503 if state.call_id.is_empty() {
1504 state.call_id = fallback_call_id;
1505 }
1506
1507 if !state.started
1508 && let Some(name) = event.get("name").and_then(Value::as_str)
1509 {
1510 state.name = Some(name.to_string());
1511 chunks.push(StreamChunk::ToolCallStart {
1512 id: state.call_id.clone(),
1513 name: name.to_string(),
1514 });
1515 state.started = true;
1516 }
1517
1518 let arguments = if use_final_arguments {
1519 Self::extract_json_string(event.get("arguments")).or_else(|| {
1520 event
1521 .get("delta")
1522 .and_then(Value::as_str)
1523 .map(str::to_string)
1524 })
1525 } else {
1526 event
1527 .get("delta")
1528 .and_then(Value::as_str)
1529 .map(str::to_string)
1530 };
1531
1532 if let Some(arguments) = arguments {
1533 Self::emit_missing_responses_tool_arguments(state, arguments, chunks);
1534 }
1535 }
1536
1537 fn emit_missing_responses_tool_arguments(
1538 state: &mut ResponsesToolState,
1539 arguments: String,
1540 chunks: &mut Vec<StreamChunk>,
1541 ) {
1542 let delta = if arguments.starts_with(&state.emitted_arguments) {
1543 arguments[state.emitted_arguments.len()..].to_string()
1544 } else if state.emitted_arguments.is_empty() {
1545 arguments.clone()
1546 } else if arguments == state.emitted_arguments {
1547 String::new()
1548 } else {
1549 arguments.clone()
1550 };
1551
1552 if !delta.is_empty() {
1553 chunks.push(StreamChunk::ToolCallDelta {
1554 id: state.call_id.clone(),
1555 arguments_delta: delta.clone(),
1556 });
1557 state.emitted_arguments.push_str(&delta);
1558 }
1559 }
1560
1561 fn parse_responses_sse_bytes(
1562 parser: &mut ResponsesSseParser,
1563 bytes: &[u8],
1564 ) -> Vec<StreamChunk> {
1565 parser.line_buffer.push_str(&String::from_utf8_lossy(bytes));
1566 let mut chunks = Vec::new();
1567
1568 while let Some(line_end) = parser.line_buffer.find('\n') {
1569 let mut line = parser.line_buffer[..line_end].to_string();
1570 parser.line_buffer.drain(..=line_end);
1571
1572 if line.ends_with('\r') {
1573 line.pop();
1574 }
1575
1576 if line.is_empty() {
1577 if !parser.event_data_lines.is_empty() {
1578 let data = parser.event_data_lines.join("\n");
1579 parser.event_data_lines.clear();
1580 Self::parse_responses_sse_event(parser, &data, &mut chunks);
1581 }
1582 continue;
1583 }
1584
1585 if let Some(data) = line.strip_prefix("data:") {
1586 let data = data.strip_prefix(' ').unwrap_or(data);
1587 parser.event_data_lines.push(data.to_string());
1588 }
1589 }
1590
1591 chunks
1592 }
1593
1594 fn finish_responses_sse_parser(parser: &mut ResponsesSseParser) -> Vec<StreamChunk> {
1595 let mut chunks = Vec::new();
1596
1597 if !parser.line_buffer.is_empty() {
1598 let mut line = std::mem::take(&mut parser.line_buffer);
1599 if line.ends_with('\r') {
1600 line.pop();
1601 }
1602 if let Some(data) = line.strip_prefix("data:") {
1603 let data = data.strip_prefix(' ').unwrap_or(data);
1604 parser.event_data_lines.push(data.to_string());
1605 }
1606 }
1607
1608 if !parser.event_data_lines.is_empty() {
1609 let data = parser.event_data_lines.join("\n");
1610 parser.event_data_lines.clear();
1611 Self::parse_responses_sse_event(parser, &data, &mut chunks);
1612 }
1613
1614 chunks
1615 }
1616
1617 async fn complete_with_chatgpt_responses(
1618 &self,
1619 request: CompletionRequest,
1620 access_token: String,
1621 ) -> Result<CompletionResponse> {
1622 let stream = self
1623 .complete_stream_with_chatgpt_responses(request, access_token)
1624 .await?;
1625 Self::collect_stream_completion(stream).await
1626 }
1627
1628 async fn complete_with_openai_responses(
1629 &self,
1630 request: CompletionRequest,
1631 api_key: String,
1632 ) -> Result<CompletionResponse> {
1633 let stream = self
1634 .complete_stream_with_openai_responses(request, api_key)
1635 .await?;
1636 Self::collect_stream_completion(stream).await
1637 }
1638
1639 async fn collect_stream_completion(
1640 mut stream: BoxStream<'static, StreamChunk>,
1641 ) -> Result<CompletionResponse> {
1642 #[derive(Default)]
1643 struct ToolAccumulator {
1644 id: String,
1645 name: String,
1646 arguments: String,
1647 }
1648
1649 let mut text = String::new();
1650 let mut tools = Vec::<ToolAccumulator>::new();
1651 let mut tool_index_by_id = HashMap::<String, usize>::new();
1652 let mut usage = Usage::default();
1653
1654 while let Some(chunk) = stream.next().await {
1655 match chunk {
1656 StreamChunk::Text(delta) => text.push_str(&delta),
1657 StreamChunk::ToolCallStart { id, name } => {
1658 let next_idx = tools.len();
1659 let idx = *tool_index_by_id.entry(id.clone()).or_insert(next_idx);
1660 if idx == next_idx {
1661 tools.push(ToolAccumulator {
1662 id,
1663 name,
1664 arguments: String::new(),
1665 });
1666 } else if tools[idx].name == "tool" {
1667 tools[idx].name = name;
1668 }
1669 }
1670 StreamChunk::ToolCallDelta {
1671 id,
1672 arguments_delta,
1673 } => {
1674 if let Some(idx) = tool_index_by_id.get(&id).copied() {
1675 tools[idx].arguments.push_str(&arguments_delta);
1676 } else {
1677 let idx = tools.len();
1678 tool_index_by_id.insert(id.clone(), idx);
1679 tools.push(ToolAccumulator {
1680 id,
1681 name: "tool".to_string(),
1682 arguments: arguments_delta,
1683 });
1684 }
1685 }
1686 StreamChunk::ToolCallEnd { .. } => {}
1687 StreamChunk::Done { usage: done_usage } => {
1688 if let Some(done_usage) = done_usage {
1689 usage = done_usage;
1690 }
1691 }
1692 StreamChunk::Error(message) => anyhow::bail!(message),
1693 }
1694 }
1695
1696 let mut content = Vec::new();
1697 if !text.is_empty() {
1698 content.push(ContentPart::Text { text });
1699 }
1700 for tool in tools {
1701 content.push(ContentPart::ToolCall {
1702 id: tool.id,
1703 name: tool.name,
1704 arguments: tool.arguments,
1705 thought_signature: None,
1706 });
1707 }
1708
1709 let finish_reason = if content
1710 .iter()
1711 .any(|part| matches!(part, ContentPart::ToolCall { .. }))
1712 {
1713 FinishReason::ToolCalls
1714 } else {
1715 FinishReason::Stop
1716 };
1717
1718 Ok(CompletionResponse {
1719 message: Message {
1720 role: Role::Assistant,
1721 content,
1722 },
1723 usage,
1724 finish_reason,
1725 })
1726 }
1727
1728 async fn complete_stream_with_chatgpt_responses(
1729 &self,
1730 request: CompletionRequest,
1731 access_token: String,
1732 ) -> Result<BoxStream<'static, StreamChunk>> {
1733 let account_id = self.resolved_chatgpt_account_id(&access_token).context(
1734 "OpenAI Codex OAuth token is missing ChatGPT workspace/account ID. Re-run `codetether auth codex --device-code`.",
1735 )?;
1736 match self
1737 .complete_stream_with_realtime(
1738 request.clone(),
1739 access_token.clone(),
1740 Some(account_id.clone()),
1741 "chatgpt-codex-responses-ws",
1742 ResponsesWsBackend::ChatGptCodex,
1743 )
1744 .await
1745 {
1746 Ok(stream) => Ok(stream),
1747 Err(error) => {
1748 tracing::warn!(
1749 error = %error,
1750 "Responses WebSocket connect failed for ChatGPT Codex backend; falling back to HTTP responses streaming"
1751 );
1752 self.complete_stream_with_chatgpt_http_responses(request, access_token, account_id)
1753 .await
1754 }
1755 }
1756 }
1757
1758 async fn complete_stream_with_openai_responses(
1759 &self,
1760 request: CompletionRequest,
1761 api_key: String,
1762 ) -> Result<BoxStream<'static, StreamChunk>> {
1763 match self
1764 .complete_stream_with_realtime(
1765 request.clone(),
1766 api_key.clone(),
1767 None,
1768 "openai-responses-ws",
1769 ResponsesWsBackend::OpenAi,
1770 )
1771 .await
1772 {
1773 Ok(stream) => Ok(stream),
1774 Err(error) => {
1775 tracing::warn!(
1776 error = %error,
1777 "Responses WebSocket connect failed for OpenAI backend; falling back to HTTP responses streaming"
1778 );
1779 self.complete_stream_with_openai_http_responses(request, api_key)
1780 .await
1781 }
1782 }
1783 }
1784
1785 async fn complete_stream_with_chatgpt_http_responses(
1786 &self,
1787 request: CompletionRequest,
1788 access_token: String,
1789 account_id: String,
1790 ) -> Result<BoxStream<'static, StreamChunk>> {
1791 let (model, reasoning_effort, service_tier) =
1792 Self::resolve_model_and_reasoning_effort_and_service_tier(&request.model);
1793 let instructions = Self::extract_responses_instructions(&request.messages);
1794 let input = Self::convert_messages_to_responses_input(&request.messages);
1795 let tools = Self::convert_responses_tools(&request.tools);
1796
1797 let mut body = json!({
1798 "model": model,
1799 "instructions": instructions,
1800 "input": input,
1801 "stream": true,
1802 "store": false,
1803 "tool_choice": "auto",
1804 "parallel_tool_calls": true,
1805 });
1806
1807 if !tools.is_empty() {
1808 body["tools"] = json!(tools);
1809 }
1810 if let Some(level) = reasoning_effort {
1811 body["reasoning"] = json!({ "effort": level.as_str() });
1812 }
1813 Self::apply_service_tier(&mut body, service_tier);
1814
1815 tracing::info!(
1816 backend = "chatgpt-codex-responses-http",
1817 instructions_len = body
1818 .get("instructions")
1819 .and_then(|v| v.as_str())
1820 .map(str::len)
1821 .unwrap_or(0),
1822 input_items = body
1823 .get("input")
1824 .and_then(|v| v.as_array())
1825 .map(Vec::len)
1826 .unwrap_or(0),
1827 has_tools = !tools.is_empty(),
1828 "Sending HTTP responses request"
1829 );
1830
1831 let response = self
1832 .client
1833 .post(format!("{}/responses", CHATGPT_CODEX_API_URL))
1834 .header("Authorization", format!("Bearer {}", access_token))
1835 .header("chatgpt-account-id", account_id)
1836 .header("Content-Type", "application/json")
1837 .json(&body)
1838 .send()
1839 .await
1840 .context("Failed to send streaming request to ChatGPT Codex backend")?;
1841
1842 let status = response.status();
1843 if !status.is_success() {
1844 let body = response.text().await.unwrap_or_default();
1845 anyhow::bail!(Self::format_openai_api_error(status, &body, &request.model));
1846 }
1847
1848 let stream = async_stream::stream! {
1849 let mut parser = ResponsesSseParser::default();
1850 let mut byte_stream = response.bytes_stream();
1851
1852 while let Some(result) = byte_stream.next().await {
1853 match result {
1854 Ok(bytes) => {
1855 for chunk in Self::parse_responses_sse_bytes(&mut parser, &bytes) {
1856 yield chunk;
1857 }
1858 }
1859 Err(error) => yield StreamChunk::Error(error.to_string()),
1860 }
1861 }
1862
1863 for chunk in Self::finish_responses_sse_parser(&mut parser) {
1864 yield chunk;
1865 }
1866 };
1867
1868 Ok(Box::pin(stream))
1869 }
1870
1871 async fn complete_stream_with_openai_http_responses(
1872 &self,
1873 request: CompletionRequest,
1874 api_key: String,
1875 ) -> Result<BoxStream<'static, StreamChunk>> {
1876 let (model, reasoning_effort, service_tier) =
1877 Self::resolve_model_and_reasoning_effort_and_service_tier(&request.model);
1878 let instructions = Self::extract_responses_instructions(&request.messages);
1879 let input = Self::convert_messages_to_responses_input(&request.messages);
1880 let tools = Self::convert_responses_tools(&request.tools);
1881
1882 let mut body = json!({
1883 "model": model,
1884 "instructions": instructions,
1885 "input": input,
1886 "stream": true,
1887 "store": false,
1888 "tool_choice": "auto",
1889 "parallel_tool_calls": true,
1890 });
1891
1892 if !tools.is_empty() {
1893 body["tools"] = json!(tools);
1894 }
1895 if let Some(level) = reasoning_effort {
1896 body["reasoning"] = json!({ "effort": level.as_str() });
1897 }
1898 Self::apply_service_tier(&mut body, service_tier);
1899
1900 tracing::info!(
1901 backend = "openai-responses-http",
1902 instructions_len = body
1903 .get("instructions")
1904 .and_then(|v| v.as_str())
1905 .map(str::len)
1906 .unwrap_or(0),
1907 input_items = body
1908 .get("input")
1909 .and_then(|v| v.as_array())
1910 .map(Vec::len)
1911 .unwrap_or(0),
1912 has_tools = !tools.is_empty(),
1913 "Sending HTTP responses request"
1914 );
1915
1916 let response = self
1917 .client
1918 .post(format!("{}/responses", OPENAI_API_URL))
1919 .header("Authorization", format!("Bearer {}", api_key))
1920 .header("Content-Type", "application/json")
1921 .json(&body)
1922 .send()
1923 .await
1924 .context("Failed to send streaming request to OpenAI responses API")?;
1925
1926 let status = response.status();
1927 if !status.is_success() {
1928 let body = response.text().await.unwrap_or_default();
1929 anyhow::bail!(Self::format_openai_api_error(status, &body, &request.model));
1930 }
1931
1932 let stream = async_stream::stream! {
1933 let mut parser = ResponsesSseParser::default();
1934 let mut byte_stream = response.bytes_stream();
1935
1936 while let Some(result) = byte_stream.next().await {
1937 match result {
1938 Ok(bytes) => {
1939 for chunk in Self::parse_responses_sse_bytes(&mut parser, &bytes) {
1940 yield chunk;
1941 }
1942 }
1943 Err(error) => yield StreamChunk::Error(error.to_string()),
1944 }
1945 }
1946
1947 for chunk in Self::finish_responses_sse_parser(&mut parser) {
1948 yield chunk;
1949 }
1950 };
1951
1952 Ok(Box::pin(stream))
1953 }
1954
1955 async fn complete_stream_with_realtime(
1956 &self,
1957 request: CompletionRequest,
1958 access_token: String,
1959 chatgpt_account_id: Option<String>,
1960 backend: &'static str,
1961 ws_backend: ResponsesWsBackend,
1962 ) -> Result<BoxStream<'static, StreamChunk>> {
1963 let (model, reasoning_effort, service_tier) =
1964 Self::resolve_model_and_reasoning_effort_and_service_tier(&request.model);
1965 let body = Self::build_responses_ws_create_event_for_backend(
1966 &request,
1967 &model,
1968 reasoning_effort,
1969 service_tier,
1970 ws_backend,
1971 );
1972 tracing::info!(
1973 backend = backend,
1974 instructions_len = body
1975 .get("instructions")
1976 .and_then(|v| v.as_str())
1977 .map(str::len)
1978 .unwrap_or(0),
1979 input_items = body
1980 .get("input")
1981 .and_then(|v| v.as_array())
1982 .map(Vec::len)
1983 .unwrap_or(0),
1984 has_tools = body
1985 .get("tools")
1986 .and_then(|v| v.as_array())
1987 .is_some_and(|tools| !tools.is_empty()),
1988 "Sending responses websocket request"
1989 );
1990
1991 let connection = self
1992 .connect_responses_ws_with_token(
1993 &access_token,
1994 chatgpt_account_id.as_deref(),
1995 ws_backend,
1996 )
1997 .await?;
1998
1999 let stream = async_stream::stream! {
2000 let mut connection = connection;
2001 let mut parser = ResponsesSseParser::default();
2002 if let Err(error) = connection.send_event(&body).await {
2003 yield StreamChunk::Error(error.to_string());
2004 let _ = connection.close().await;
2005 return;
2006 }
2007
2008 let mut saw_terminal = false;
2009 loop {
2010 match connection.recv_event().await {
2011 Ok(Some(event)) => {
2012 let mut chunks = Vec::new();
2013 Self::parse_responses_event(&mut parser, &event, &mut chunks);
2014 for chunk in chunks {
2015 if matches!(chunk, StreamChunk::Done { .. } | StreamChunk::Error(_)) {
2016 saw_terminal = true;
2017 }
2018 yield chunk;
2019 }
2020 if saw_terminal {
2021 break;
2022 }
2023 }
2024 Ok(None) => {
2025 if !saw_terminal {
2026 yield StreamChunk::Error("Realtime WebSocket closed before response completion".to_string());
2027 }
2028 break;
2029 }
2030 Err(error) => {
2031 yield StreamChunk::Error(error.to_string());
2032 break;
2033 }
2034 }
2035 }
2036
2037 let _ = connection.close().await;
2038 };
2039
2040 Ok(Box::pin(stream))
2041 }
2042}
2043
2044#[async_trait]
2045impl Provider for OpenAiCodexProvider {
2046 fn name(&self) -> &str {
2047 "openai-codex"
2048 }
2049
2050 async fn list_models(&self) -> Result<Vec<ModelInfo>> {
2051 let mut models = vec![
2052 Self::model_info("gpt-5", "GPT-5", 400_000, 128_000, false),
2053 Self::model_info("gpt-5-mini", "GPT-5 Mini", 264_000, 64_000, false),
2054 Self::model_info("gpt-5.1-codex", "GPT-5.1 Codex", 400_000, 128_000, false),
2055 Self::model_info("gpt-5.2", "GPT-5.2", 400_000, 128_000, false),
2056 Self::model_info("gpt-5.3-codex", "GPT-5.3 Codex", 400_000, 128_000, false),
2057 Self::model_info("gpt-5.4", "GPT-5.4", 272_000, 128_000, false),
2058 Self::model_info("gpt-5.4-fast", "GPT-5.4 Fast", 272_000, 128_000, false),
2059 Self::model_info("gpt-5.4-pro", "GPT-5.4 Pro", 272_000, 128_000, false),
2060 Self::model_info("gpt-5.5", "GPT-5.5", 400_000, 128_000, false),
2061 Self::model_info("gpt-5.5-fast", "GPT-5.5 Fast", 400_000, 128_000, false),
2062 Self::model_info("o3", "O3", 200_000, 100_000, true),
2063 Self::model_info("o4-mini", "O4 Mini", 200_000, 100_000, true),
2064 ];
2065
2066 if self.using_chatgpt_backend() {
2067 models.retain(|model| Self::chatgpt_supported_models().contains(&model.id.as_str()));
2068 } else {
2069 models.retain(|model| !Self::api_key_hidden_model(&model.id));
2070 }
2071
2072 Ok(models)
2073 }
2074
2075 async fn complete(&self, request: CompletionRequest) -> Result<CompletionResponse> {
2076 self.validate_model_for_backend(&request.model)?;
2077 let access_token = self.get_access_token().await?;
2078 if self.using_chatgpt_backend() {
2079 return self
2080 .complete_with_chatgpt_responses(request, access_token)
2081 .await;
2082 }
2083 self.complete_with_openai_responses(request, access_token)
2084 .await
2085 }
2086
2087 async fn complete_stream(
2088 &self,
2089 request: CompletionRequest,
2090 ) -> Result<BoxStream<'static, StreamChunk>> {
2091 self.validate_model_for_backend(&request.model)?;
2092 let access_token = self.get_access_token().await?;
2093 if self.using_chatgpt_backend() {
2094 return self
2095 .complete_stream_with_chatgpt_responses(request, access_token)
2096 .await;
2097 }
2098 self.complete_stream_with_openai_responses(request, access_token)
2099 .await
2100 }
2101}
2102
2103#[cfg(test)]
2104mod tests {
2105 use super::*;
2106 use futures::stream;
2107 use tokio::io::duplex;
2108 use tokio_tungstenite::{
2109 accept_hdr_async, client_async,
2110 tungstenite::{
2111 Message as WsMessage,
2112 handshake::server::{Request as ServerRequest, Response as ServerResponse},
2113 },
2114 };
2115
2116 #[test]
2117 fn test_generate_pkce() {
2118 let pkce = OpenAiCodexProvider::generate_pkce();
2119 assert!(!pkce.verifier.is_empty());
2120 assert!(!pkce.challenge.is_empty());
2121 assert_ne!(pkce.verifier, pkce.challenge);
2122 }
2123
2124 #[test]
2125 fn test_generate_state() {
2126 let state = OpenAiCodexProvider::generate_state();
2127 assert_eq!(state.len(), 32);
2128 }
2129
2130 #[test]
2131 fn formats_scope_error_with_actionable_message() {
2132 let body = r#"{"error":{"message":"Missing scopes: model.request"}}"#;
2133 let msg = OpenAiCodexProvider::format_openai_api_error(
2134 StatusCode::UNAUTHORIZED,
2135 body,
2136 "gpt-5.3-codex",
2137 );
2138 assert!(msg.contains("model.request"));
2139 assert!(msg.contains("codetether auth codex"));
2140 }
2141
2142 #[test]
2143 fn parses_model_suffix_for_thinking_level() {
2144 let (model, level) =
2145 OpenAiCodexProvider::resolve_model_and_reasoning_effort("gpt-5.3-codex:high");
2146 assert_eq!(model, "gpt-5.3-codex");
2147 assert_eq!(level.map(ThinkingLevel::as_str), Some("high"));
2148
2149 let (model, level) =
2150 OpenAiCodexProvider::resolve_model_and_reasoning_effort("gpt-5.5:none");
2151 assert_eq!(model, "gpt-5.5");
2152 assert_eq!(level.map(ThinkingLevel::as_str), Some("none"));
2153
2154 let (model, level) =
2155 OpenAiCodexProvider::resolve_model_and_reasoning_effort("gpt-5.5:xhigh");
2156 assert_eq!(model, "gpt-5.5");
2157 assert_eq!(level.map(ThinkingLevel::as_str), Some("xhigh"));
2158 }
2159
2160 #[test]
2161 fn maps_fast_model_alias_to_priority_service_tier() {
2162 let (model, level, service_tier) =
2163 OpenAiCodexProvider::resolve_model_and_reasoning_effort_and_service_tier(
2164 "gpt-5.4-fast:high",
2165 );
2166 assert_eq!(model, "gpt-5.4");
2167 assert_eq!(level.map(ThinkingLevel::as_str), Some("high"));
2168 assert_eq!(service_tier.map(CodexServiceTier::as_str), Some("priority"));
2169
2170 let (model, level, service_tier) =
2171 OpenAiCodexProvider::resolve_model_and_reasoning_effort_and_service_tier(
2172 "gpt-5.5-fast:high",
2173 );
2174 assert_eq!(model, "gpt-5.5");
2175 assert_eq!(level.map(ThinkingLevel::as_str), Some("high"));
2176 assert_eq!(service_tier.map(CodexServiceTier::as_str), Some("priority"));
2177 }
2178
2179 #[test]
2180 fn ignores_unknown_model_suffix() {
2181 let (model, level) =
2182 OpenAiCodexProvider::resolve_model_and_reasoning_effort("gpt-5.3-codex:turbo");
2183 assert_eq!(model, "gpt-5.3-codex:turbo");
2184 assert_eq!(level, None);
2185
2186 let (model, level) =
2187 OpenAiCodexProvider::resolve_model_and_reasoning_effort("gpt-5.5:minimal");
2188 assert_eq!(model, "gpt-5.5:minimal");
2189 assert_eq!(level, None);
2190 }
2191
2192 #[test]
2193 fn skips_reasoning_effort_for_non_reasoning_models() {
2194 let (model, level) = OpenAiCodexProvider::resolve_model_and_reasoning_effort("gpt-4o:high");
2195 assert_eq!(model, "gpt-4o");
2196 assert_eq!(level, None);
2197 }
2198
2199 #[tokio::test]
2200 async fn lists_chatgpt_codex_models() {
2201 let provider = OpenAiCodexProvider::new();
2202 let models = provider
2203 .list_models()
2204 .await
2205 .expect("model listing should succeed");
2206
2207 assert!(models.iter().any(|model| model.id == "gpt-5.4"));
2208 assert!(models.iter().any(|model| model.id == "gpt-5.4-fast"));
2209 assert!(models.iter().any(|model| model.id == "gpt-5.5"));
2210 assert!(models.iter().any(|model| model.id == "gpt-5.5-fast"));
2211 assert!(!models.iter().any(|model| model.id == "gpt-5.4-pro"));
2212 }
2213
2214 #[tokio::test]
2215 async fn omits_gpt_5_5_from_api_key_model_listing_for_now() {
2216 let provider = OpenAiCodexProvider::from_api_key("test-key".to_string());
2217 let models = provider
2218 .list_models()
2219 .await
2220 .expect("model listing should succeed");
2221
2222 assert!(!models.iter().any(|model| model.id == "gpt-5.5"));
2223 assert!(!models.iter().any(|model| model.id == "gpt-5.5-fast"));
2224 }
2225
2226 #[test]
2227 fn rejects_pro_model_for_chatgpt_backend() {
2228 let provider = OpenAiCodexProvider::new();
2229 let err = provider
2230 .validate_model_for_backend("gpt-5.4-pro")
2231 .expect_err("chatgpt backend should reject unsupported model");
2232 assert!(
2233 err.to_string()
2234 .contains("not supported when using Codex with a ChatGPT account")
2235 );
2236 }
2237
2238 #[test]
2239 fn allows_pro_model_for_api_key_backend() {
2240 let provider = OpenAiCodexProvider::from_api_key("test-key".to_string());
2241 provider
2242 .validate_model_for_backend("gpt-5.4-pro")
2243 .expect("api key backend should allow pro model");
2244 }
2245
2246 #[test]
2247 fn allows_fast_alias_for_chatgpt_backend() {
2248 let provider = OpenAiCodexProvider::new();
2249 provider
2250 .validate_model_for_backend("gpt-5.4-fast:high")
2251 .expect("chatgpt backend should allow fast alias");
2252 provider
2253 .validate_model_for_backend("gpt-5.5-fast:high")
2254 .expect("chatgpt backend should allow GPT-5.5 fast alias");
2255 }
2256
2257 #[test]
2258 fn extracts_chatgpt_account_id_from_jwt_claims() {
2259 let header = URL_SAFE_NO_PAD.encode(r#"{"alg":"none","typ":"JWT"}"#);
2260 let payload = URL_SAFE_NO_PAD
2261 .encode(r#"{"https://api.openai.com/auth":{"chatgpt_account_id":"org_test123"}}"#);
2262 let jwt = format!("{header}.{payload}.sig");
2263
2264 let account_id = OpenAiCodexProvider::extract_chatgpt_account_id(&jwt);
2265 assert_eq!(account_id.as_deref(), Some("org_test123"));
2266 }
2267
2268 #[test]
2269 fn extracts_chatgpt_account_id_from_organizations_claim() {
2270 let header = URL_SAFE_NO_PAD.encode(r#"{"alg":"none","typ":"JWT"}"#);
2271 let payload = URL_SAFE_NO_PAD.encode(r#"{"organizations":[{"id":"org_from_list"}]}"#);
2272 let jwt = format!("{header}.{payload}.sig");
2273
2274 let account_id = OpenAiCodexProvider::extract_chatgpt_account_id(&jwt);
2275 assert_eq!(account_id.as_deref(), Some("org_from_list"));
2276 }
2277
2278 #[test]
2279 fn extracts_responses_instructions_from_system_messages() {
2280 let messages = vec![
2281 Message {
2282 role: Role::System,
2283 content: vec![ContentPart::Text {
2284 text: "first system block".to_string(),
2285 }],
2286 },
2287 Message {
2288 role: Role::User,
2289 content: vec![ContentPart::Text {
2290 text: "user message".to_string(),
2291 }],
2292 },
2293 Message {
2294 role: Role::System,
2295 content: vec![ContentPart::Text {
2296 text: "second system block".to_string(),
2297 }],
2298 },
2299 ];
2300
2301 let instructions = OpenAiCodexProvider::extract_responses_instructions(&messages);
2302 assert_eq!(instructions, "first system block\n\nsecond system block");
2303 }
2304
2305 #[test]
2306 fn falls_back_to_default_responses_instructions_without_system_message() {
2307 let messages = vec![Message {
2308 role: Role::User,
2309 content: vec![ContentPart::Text {
2310 text: "only user".to_string(),
2311 }],
2312 }];
2313
2314 let instructions = OpenAiCodexProvider::extract_responses_instructions(&messages);
2315 assert_eq!(instructions, DEFAULT_RESPONSES_INSTRUCTIONS);
2316 }
2317
2318 #[test]
2319 fn responses_input_ignores_system_messages() {
2320 let messages = vec![
2321 Message {
2322 role: Role::System,
2323 content: vec![ContentPart::Text {
2324 text: "system".to_string(),
2325 }],
2326 },
2327 Message {
2328 role: Role::User,
2329 content: vec![ContentPart::Text {
2330 text: "user".to_string(),
2331 }],
2332 },
2333 ];
2334
2335 let input = OpenAiCodexProvider::convert_messages_to_responses_input(&messages);
2336 assert_eq!(input.len(), 1);
2337 assert_eq!(input[0].get("role").and_then(Value::as_str), Some("user"));
2338 }
2339
2340 #[test]
2341 fn responses_ws_request_uses_bearer_auth() {
2342 let request = OpenAiCodexProvider::build_responses_ws_request_with_base_url(
2343 "wss://example.com/v1/responses",
2344 "test-token",
2345 )
2346 .expect("request should build");
2347
2348 assert_eq!(request.uri().to_string(), "wss://example.com/v1/responses");
2349 assert_eq!(
2350 request
2351 .headers()
2352 .get("Authorization")
2353 .and_then(|v| v.to_str().ok()),
2354 Some("Bearer test-token")
2355 );
2356 assert_eq!(
2357 request
2358 .headers()
2359 .get("User-Agent")
2360 .and_then(|v| v.to_str().ok()),
2361 Some("codetether-responses-ws/1.0")
2362 );
2363 }
2364
2365 #[test]
2366 fn responses_ws_request_includes_chatgpt_account_id_when_provided() {
2367 let request = OpenAiCodexProvider::build_responses_ws_request_with_base_url_and_account_id(
2368 "wss://example.com/v1/responses",
2369 "test-token",
2370 Some("org_123"),
2371 )
2372 .expect("request should build");
2373
2374 assert_eq!(
2375 request
2376 .headers()
2377 .get("chatgpt-account-id")
2378 .and_then(|v| v.to_str().ok()),
2379 Some("org_123")
2380 );
2381 }
2382
2383 #[test]
2384 fn builds_responses_ws_create_event_for_tools() {
2385 let request = CompletionRequest {
2386 messages: vec![
2387 Message {
2388 role: Role::System,
2389 content: vec![ContentPart::Text {
2390 text: "System prompt".to_string(),
2391 }],
2392 },
2393 Message {
2394 role: Role::User,
2395 content: vec![ContentPart::Text {
2396 text: "Inspect the repo".to_string(),
2397 }],
2398 },
2399 ],
2400 tools: vec![ToolDefinition {
2401 name: "read".to_string(),
2402 description: "Read a file".to_string(),
2403 parameters: json!({
2404 "type": "object",
2405 "properties": {
2406 "path": { "type": "string" }
2407 },
2408 "required": ["path"]
2409 }),
2410 }],
2411 model: "gpt-5.4".to_string(),
2412 temperature: None,
2413 top_p: None,
2414 max_tokens: Some(8192),
2415 stop: Vec::new(),
2416 };
2417
2418 let event = OpenAiCodexProvider::build_responses_ws_create_event(
2419 &request,
2420 "gpt-5.4",
2421 Some(ThinkingLevel::High),
2422 None,
2423 );
2424
2425 assert_eq!(
2426 event.get("type").and_then(Value::as_str),
2427 Some("response.create")
2428 );
2429 assert_eq!(event.get("model").and_then(Value::as_str), Some("gpt-5.4"));
2430 assert_eq!(event.get("store").and_then(Value::as_bool), Some(false));
2431 assert_eq!(
2432 event.get("max_output_tokens").and_then(Value::as_u64),
2433 Some(8192)
2434 );
2435 assert_eq!(
2436 event.get("tools").and_then(Value::as_array).map(Vec::len),
2437 Some(1)
2438 );
2439 }
2440
2441 #[tokio::test]
2442 async fn responses_ws_connection_round_trips_json_events() {
2443 let (client_io, server_io) = duplex(16 * 1024);
2444
2445 let server = tokio::spawn(async move {
2446 let callback = |request: &ServerRequest, response: ServerResponse| {
2447 assert_eq!(request.uri().path(), "/v1/responses");
2448 assert_eq!(
2449 request
2450 .headers()
2451 .get("Authorization")
2452 .and_then(|v| v.to_str().ok()),
2453 Some("Bearer test-token")
2454 );
2455 Ok(response)
2456 };
2457
2458 let mut socket = accept_hdr_async(server_io, callback)
2459 .await
2460 .expect("server websocket handshake should succeed");
2461 let message = socket
2462 .next()
2463 .await
2464 .expect("server should receive message")
2465 .expect("server message should be valid");
2466 match message {
2467 WsMessage::Text(text) => {
2468 let event: Value =
2469 serde_json::from_str(&text).expect("server should parse JSON event");
2470 assert_eq!(
2471 event.get("type").and_then(Value::as_str),
2472 Some("response.create")
2473 );
2474 }
2475 other => panic!("expected text frame, got {other:?}"),
2476 }
2477
2478 socket
2479 .send(WsMessage::Text(
2480 json!({ "type": "response.created" }).to_string().into(),
2481 ))
2482 .await
2483 .expect("server should send response");
2484 });
2485
2486 let request = OpenAiCodexProvider::build_responses_ws_request_with_base_url(
2487 "ws://localhost/v1/responses",
2488 "test-token",
2489 )
2490 .expect("client request should build");
2491 let (stream, _) = client_async(request, client_io)
2492 .await
2493 .expect("client websocket handshake should succeed");
2494 let mut connection = OpenAiRealtimeConnection::new(stream);
2495
2496 connection
2497 .send_event(&json!({ "type": "response.create", "model": "gpt-5.4", "input": [] }))
2498 .await
2499 .expect("client should send event");
2500 let event = connection
2501 .recv_event()
2502 .await
2503 .expect("client should read event")
2504 .expect("client should receive session event");
2505 assert_eq!(
2506 event.get("type").and_then(Value::as_str),
2507 Some("response.created")
2508 );
2509 connection
2510 .close()
2511 .await
2512 .expect("client should close cleanly");
2513
2514 server.await.expect("server task should finish");
2515 }
2516
2517 #[test]
2518 fn responses_sse_parser_buffers_split_tool_call_events() {
2519 let mut parser = ResponsesSseParser::default();
2520
2521 let chunk1 = br#"data: {"type":"response.output_item.added","item":{"type":"function_call","id":"fc_1","call_id":"call_1","name":"bash"}}
2522
2523da"#;
2524 let first = OpenAiCodexProvider::parse_responses_sse_bytes(&mut parser, chunk1);
2525 assert_eq!(first.len(), 1);
2526 match &first[0] {
2527 StreamChunk::ToolCallStart { id, name } => {
2528 assert_eq!(id, "call_1");
2529 assert_eq!(name, "bash");
2530 }
2531 other => panic!("expected tool start, got {other:?}"),
2532 }
2533
2534 let chunk2 = br#"ta: {"type":"response.function_call_arguments.delta","item_id":"fc_1","delta":"{\"command\":"}
2535
2536data: {"type":"response.function_call_arguments.delta","item_id":"fc_1","delta":"\"ls\"}"}
2537
2538data: {"type":"response.output_item.done","item":{"type":"function_call","id":"fc_1","call_id":"call_1","name":"bash","arguments":"{\"command\":\"ls\"}"}}
2539
2540data: {"type":"response.completed","response":{"usage":{"input_tokens":11,"output_tokens":7,"total_tokens":18}}}
2541
2542"#;
2543 let second = OpenAiCodexProvider::parse_responses_sse_bytes(&mut parser, chunk2);
2544
2545 assert_eq!(second.len(), 4);
2546 match &second[0] {
2547 StreamChunk::ToolCallDelta {
2548 id,
2549 arguments_delta,
2550 } => {
2551 assert_eq!(id, "call_1");
2552 assert_eq!(arguments_delta, "{\"command\":");
2553 }
2554 other => panic!("expected first tool delta, got {other:?}"),
2555 }
2556 match &second[1] {
2557 StreamChunk::ToolCallDelta {
2558 id,
2559 arguments_delta,
2560 } => {
2561 assert_eq!(id, "call_1");
2562 assert_eq!(arguments_delta, "\"ls\"}");
2563 }
2564 other => panic!("expected second tool delta, got {other:?}"),
2565 }
2566 match &second[2] {
2567 StreamChunk::ToolCallEnd { id } => assert_eq!(id, "call_1"),
2568 other => panic!("expected tool end, got {other:?}"),
2569 }
2570 match &second[3] {
2571 StreamChunk::Done { usage } => {
2572 let usage = usage.as_ref().expect("expected usage");
2573 assert_eq!(usage.prompt_tokens, 11);
2574 assert_eq!(usage.completion_tokens, 7);
2575 assert_eq!(usage.total_tokens, 18);
2576 }
2577 other => panic!("expected done, got {other:?}"),
2578 }
2579 }
2580
2581 #[test]
2582 fn responses_sse_parser_falls_back_to_done_item_arguments() {
2583 let mut parser = ResponsesSseParser::default();
2584 let bytes = br#"data: {"type":"response.output_item.done","item":{"type":"function_call","id":"fc_2","call_id":"call_2","name":"read","arguments":"{\"path\":\"src/main.rs\"}"}}
2585
2586"#;
2587
2588 let chunks = OpenAiCodexProvider::parse_responses_sse_bytes(&mut parser, bytes);
2589 assert_eq!(chunks.len(), 3);
2590 match &chunks[0] {
2591 StreamChunk::ToolCallStart { id, name } => {
2592 assert_eq!(id, "call_2");
2593 assert_eq!(name, "read");
2594 }
2595 other => panic!("expected tool start, got {other:?}"),
2596 }
2597 match &chunks[1] {
2598 StreamChunk::ToolCallDelta {
2599 id,
2600 arguments_delta,
2601 } => {
2602 assert_eq!(id, "call_2");
2603 assert_eq!(arguments_delta, "{\"path\":\"src/main.rs\"}");
2604 }
2605 other => panic!("expected tool delta, got {other:?}"),
2606 }
2607 match &chunks[2] {
2608 StreamChunk::ToolCallEnd { id } => assert_eq!(id, "call_2"),
2609 other => panic!("expected tool end, got {other:?}"),
2610 }
2611 }
2612
2613 #[test]
2614 fn responses_sse_parser_flushes_final_event_without_trailing_blank_line() {
2615 let mut parser = ResponsesSseParser::default();
2616 let bytes = br#"data: {"type":"response.output_item.done","item":{"type":"function_call","id":"fc_3","call_id":"call_3","name":"read","arguments":"{\"path\":\"src/lib.rs\"}"}}"#;
2617
2618 let first = OpenAiCodexProvider::parse_responses_sse_bytes(&mut parser, bytes);
2619 assert!(first.is_empty());
2620
2621 let flushed = OpenAiCodexProvider::finish_responses_sse_parser(&mut parser);
2622 assert_eq!(flushed.len(), 3);
2623 match &flushed[0] {
2624 StreamChunk::ToolCallStart { id, name } => {
2625 assert_eq!(id, "call_3");
2626 assert_eq!(name, "read");
2627 }
2628 other => panic!("expected tool start, got {other:?}"),
2629 }
2630 match &flushed[1] {
2631 StreamChunk::ToolCallDelta {
2632 id,
2633 arguments_delta,
2634 } => {
2635 assert_eq!(id, "call_3");
2636 assert_eq!(arguments_delta, "{\"path\":\"src/lib.rs\"}");
2637 }
2638 other => panic!("expected tool delta, got {other:?}"),
2639 }
2640 match &flushed[2] {
2641 StreamChunk::ToolCallEnd { id } => assert_eq!(id, "call_3"),
2642 other => panic!("expected tool end, got {other:?}"),
2643 }
2644 }
2645
2646 #[tokio::test]
2647 async fn collect_stream_completion_updates_tool_name_after_early_delta() {
2648 let stream = stream::iter(vec![
2649 StreamChunk::ToolCallDelta {
2650 id: "call_4".to_string(),
2651 arguments_delta: "{\"path\":\"src/provider/openai_codex.rs\"}".to_string(),
2652 },
2653 StreamChunk::ToolCallStart {
2654 id: "call_4".to_string(),
2655 name: "read".to_string(),
2656 },
2657 StreamChunk::Done { usage: None },
2658 ]);
2659
2660 let response = OpenAiCodexProvider::collect_stream_completion(Box::pin(stream))
2661 .await
2662 .expect("stream completion should succeed");
2663
2664 assert!(matches!(
2665 response.message.content.first(),
2666 Some(ContentPart::ToolCall { id, name, arguments, .. })
2667 if id == "call_4"
2668 && name == "read"
2669 && arguments == "{\"path\":\"src/provider/openai_codex.rs\"}"
2670 ));
2671 }
2672}