1use super::{
9 CompletionRequest, CompletionResponse, ContentPart, FinishReason, Message, ModelInfo, Provider,
10 Role, StreamChunk, ToolDefinition, Usage,
11};
12use anyhow::{Context, Result};
13use async_trait::async_trait;
14use base64::{Engine as _, engine::general_purpose::URL_SAFE_NO_PAD};
15use futures::stream::BoxStream;
16use futures::{SinkExt, StreamExt};
17use reqwest::{Client, StatusCode};
18use serde::{Deserialize, Serialize};
19use serde_json::{Value, json};
20use sha2::{Digest, Sha256};
21use std::collections::HashMap;
22use std::io::ErrorKind;
23use std::sync::Arc;
24use tokio::io::{AsyncRead, AsyncWrite};
25use tokio::net::TcpStream;
26use tokio::sync::RwLock;
27use tokio_tungstenite::{
28 MaybeTlsStream, WebSocketStream, connect_async,
29 tungstenite::{
30 Error as WsError, Message as WsMessage,
31 client::IntoClientRequest,
32 http::{HeaderValue, Request},
33 },
34};
35
36const OPENAI_API_URL: &str = "https://api.openai.com/v1";
37const CHATGPT_CODEX_API_URL: &str = "https://chatgpt.com/backend-api/codex";
38const OPENAI_RESPONSES_WS_URL: &str = "wss://api.openai.com/v1/responses";
39const CHATGPT_CODEX_RESPONSES_WS_URL: &str = "wss://chatgpt.com/backend-api/codex/responses";
40const AUTH_ISSUER: &str = "https://auth.openai.com";
41const AUTHORIZE_URL: &str = "https://auth.openai.com/oauth/authorize";
42const TOKEN_URL: &str = "https://auth.openai.com/oauth/token";
43const CLIENT_ID: &str = "app_EMoamEEZ73f0CkXaXp7hrann";
44const REDIRECT_URI: &str = "http://localhost:1455/auth/callback";
45const SCOPE: &str = "openid profile email offline_access";
46const THINKING_LEVEL_ENV: &str = "CODETETHER_OPENAI_CODEX_THINKING_LEVEL";
47const REASONING_EFFORT_ENV: &str = "CODETETHER_OPENAI_CODEX_REASONING_EFFORT";
48const DEFAULT_RESPONSES_INSTRUCTIONS: &str = "You are a helpful assistant.";
49
50#[derive(Debug, Clone, Copy, PartialEq, Eq)]
51enum ThinkingLevel {
52 Low,
53 Medium,
54 High,
55}
56
57#[derive(Debug, Clone, Copy, PartialEq, Eq)]
58enum ResponsesWsBackend {
59 OpenAi,
60 ChatGptCodex,
61}
62
63#[derive(Debug, Clone, Copy, PartialEq, Eq)]
64enum CodexServiceTier {
65 Priority,
66}
67
68impl ThinkingLevel {
69 fn parse(raw: &str) -> Option<Self> {
70 match raw.trim().to_ascii_lowercase().as_str() {
71 "low" => Some(Self::Low),
72 "medium" => Some(Self::Medium),
73 "high" => Some(Self::High),
74 _ => None,
75 }
76 }
77
78 fn as_str(self) -> &'static str {
79 match self {
80 Self::Low => "low",
81 Self::Medium => "medium",
82 Self::High => "high",
83 }
84 }
85}
86
87impl CodexServiceTier {
88 fn as_str(self) -> &'static str {
89 match self {
90 Self::Priority => "priority",
91 }
92 }
93}
94
95#[allow(dead_code)]
97struct CachedTokens {
98 access_token: String,
99 #[allow(dead_code)]
100 refresh_token: String,
101 expires_at: std::time::Instant,
102}
103
104#[derive(Debug, Clone, Serialize, Deserialize)]
106pub struct OAuthCredentials {
107 #[serde(default)]
108 pub id_token: Option<String>,
109 #[serde(default)]
110 pub chatgpt_account_id: Option<String>,
111 pub access_token: String,
112 pub refresh_token: String,
113 pub expires_at: u64, }
115
116struct PkcePair {
118 verifier: String,
119 challenge: String,
120}
121
122#[derive(Debug, Default)]
123struct ResponsesToolState {
124 call_id: String,
125 name: Option<String>,
126 started: bool,
127 finished: bool,
128 emitted_arguments: String,
129}
130
131#[derive(Debug, Default)]
132struct ResponsesSseParser {
133 line_buffer: String,
134 event_data_lines: Vec<String>,
135 tools: HashMap<String, ResponsesToolState>,
136}
137
138pub struct OpenAiRealtimeConnection<S = MaybeTlsStream<TcpStream>> {
139 stream: WebSocketStream<S>,
140}
141
142impl<S> std::fmt::Debug for OpenAiRealtimeConnection<S> {
143 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
144 f.debug_struct("OpenAiRealtimeConnection").finish()
145 }
146}
147
148impl<S> OpenAiRealtimeConnection<S>
149where
150 S: AsyncRead + AsyncWrite + Unpin,
151{
152 fn new(stream: WebSocketStream<S>) -> Self {
153 Self { stream }
154 }
155
156 pub async fn send_event(&mut self, event: &Value) -> Result<()> {
157 let payload = serde_json::to_string(event).context("Failed to serialize Realtime event")?;
158 self.stream
159 .send(WsMessage::Text(payload.into()))
160 .await
161 .context("Failed to send Realtime event")?;
162 Ok(())
163 }
164
165 pub async fn recv_event(&mut self) -> Result<Option<Value>> {
166 while let Some(message) = self.stream.next().await {
167 let message = message.context("Realtime WebSocket receive failed")?;
168 match message {
169 WsMessage::Text(text) => {
170 let event = serde_json::from_str(&text)
171 .context("Failed to parse Realtime text event")?;
172 return Ok(Some(event));
173 }
174 WsMessage::Binary(bytes) => {
175 let text = String::from_utf8(bytes.to_vec())
176 .context("Realtime binary event was not valid UTF-8")?;
177 let event = serde_json::from_str(&text)
178 .context("Failed to parse Realtime binary event")?;
179 return Ok(Some(event));
180 }
181 WsMessage::Ping(payload) => {
182 self.stream
183 .send(WsMessage::Pong(payload))
184 .await
185 .context("Failed to respond to Realtime ping")?;
186 }
187 WsMessage::Pong(_) => {}
188 WsMessage::Frame(_) => {}
189 WsMessage::Close(_) => return Ok(None),
190 }
191 }
192
193 Ok(None)
194 }
195
196 pub async fn close(&mut self) -> Result<()> {
197 match self.stream.send(WsMessage::Close(None)).await {
198 Ok(()) => {}
199 Err(WsError::ConnectionClosed) | Err(WsError::AlreadyClosed) => {}
200 Err(WsError::Io(err))
201 if matches!(
202 err.kind(),
203 ErrorKind::BrokenPipe
204 | ErrorKind::ConnectionReset
205 | ErrorKind::ConnectionAborted
206 | ErrorKind::NotConnected
207 ) => {}
208 Err(err) => return Err(err).context("Failed to close Realtime WebSocket"),
209 }
210 Ok(())
211 }
212}
213
214pub struct OpenAiCodexProvider {
215 client: Client,
216 cached_tokens: Arc<RwLock<Option<CachedTokens>>>,
217 static_api_key: Option<String>,
218 chatgpt_account_id: Option<String>,
219 stored_credentials: Option<OAuthCredentials>,
221}
222
223impl std::fmt::Debug for OpenAiCodexProvider {
224 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
225 f.debug_struct("OpenAiCodexProvider")
226 .field("has_api_key", &self.static_api_key.is_some())
227 .field("has_chatgpt_account_id", &self.chatgpt_account_id.is_some())
228 .field("has_credentials", &self.stored_credentials.is_some())
229 .finish()
230 }
231}
232
233impl Default for OpenAiCodexProvider {
234 fn default() -> Self {
235 Self::new()
236 }
237}
238
239impl OpenAiCodexProvider {
240 fn chatgpt_supported_models() -> &'static [&'static str] {
241 &[
242 "gpt-5",
243 "gpt-5-mini",
244 "gpt-5.1-codex",
245 "gpt-5.2",
246 "gpt-5.3-codex",
247 "gpt-5.4",
248 "gpt-5.4-fast",
249 "o3",
250 "o4-mini",
251 ]
252 }
253
254 fn model_is_supported_by_backend(&self, model: &str) -> bool {
255 if !self.using_chatgpt_backend() {
256 return true;
257 }
258 Self::chatgpt_supported_models().contains(&model)
259 }
260
261 fn validate_model_for_backend(&self, model: &str) -> Result<()> {
262 let (resolved_model, _, _) =
263 Self::resolve_model_and_reasoning_effort_and_service_tier(model);
264 if self.model_is_supported_by_backend(model)
265 || self.model_is_supported_by_backend(&resolved_model)
266 {
267 return Ok(());
268 }
269
270 if self.using_chatgpt_backend() {
271 anyhow::bail!(
272 "Model '{}' is not supported when using Codex with a ChatGPT account. Supported models: {}",
273 model,
274 Self::chatgpt_supported_models().join(", ")
275 );
276 }
277
278 Ok(())
279 }
280
281 pub fn from_api_key(api_key: String) -> Self {
282 Self {
283 client: crate::provider::shared_http::shared_client().clone(),
284 cached_tokens: Arc::new(RwLock::new(None)),
285 static_api_key: Some(api_key),
286 chatgpt_account_id: None,
287 stored_credentials: None,
288 }
289 }
290
291 pub fn from_credentials(credentials: OAuthCredentials) -> Self {
293 let chatgpt_account_id = credentials
294 .chatgpt_account_id
295 .clone()
296 .or_else(|| {
297 credentials
298 .id_token
299 .as_deref()
300 .and_then(Self::extract_chatgpt_account_id_from_jwt)
301 })
302 .or_else(|| Self::extract_chatgpt_account_id_from_jwt(&credentials.access_token));
303
304 Self {
305 client: crate::provider::shared_http::shared_client().clone(),
306 cached_tokens: Arc::new(RwLock::new(None)),
307 static_api_key: None,
308 chatgpt_account_id,
309 stored_credentials: Some(credentials),
310 }
311 }
312
313 #[allow(dead_code)]
315 pub fn new() -> Self {
316 Self {
317 client: crate::provider::shared_http::shared_client().clone(),
318 cached_tokens: Arc::new(RwLock::new(None)),
319 static_api_key: None,
320 chatgpt_account_id: None,
321 stored_credentials: None,
322 }
323 }
324
325 fn generate_pkce() -> PkcePair {
327 let random_bytes: [u8; 32] = {
328 let timestamp = std::time::SystemTime::now()
329 .duration_since(std::time::UNIX_EPOCH)
330 .map(|d| d.as_nanos())
331 .unwrap_or(0);
332
333 let mut bytes = [0u8; 32];
334 let ts_bytes = timestamp.to_le_bytes();
335 let tid = std::thread::current().id();
336 let tid_repr = format!("{:?}", tid);
337 let tid_hash = Sha256::digest(tid_repr.as_bytes());
338
339 bytes[0..8].copy_from_slice(&ts_bytes[0..8]);
340 bytes[8..24].copy_from_slice(&tid_hash[0..16]);
341 bytes[24..].copy_from_slice(&Sha256::digest(ts_bytes)[0..8]);
342 bytes
343 };
344 let verifier = URL_SAFE_NO_PAD.encode(random_bytes);
345
346 let mut hasher = Sha256::new();
347 hasher.update(verifier.as_bytes());
348 let challenge_bytes = hasher.finalize();
349 let challenge = URL_SAFE_NO_PAD.encode(challenge_bytes);
350
351 PkcePair {
352 verifier,
353 challenge,
354 }
355 }
356
357 fn generate_state() -> String {
359 let timestamp = std::time::SystemTime::now()
360 .duration_since(std::time::UNIX_EPOCH)
361 .map(|d| d.as_nanos())
362 .unwrap_or(0);
363 let random: [u8; 8] = {
364 let ptr = Box::into_raw(Box::new(timestamp)) as usize;
365 let bytes = ptr.to_le_bytes();
366 let mut arr = [0u8; 8];
367 arr.copy_from_slice(&bytes);
368 arr
369 };
370 format!("{:016x}{:016x}", timestamp, u64::from_le_bytes(random))
371 }
372
373 #[allow(dead_code)]
375 pub fn get_authorization_url() -> (String, String, String) {
376 let pkce = Self::generate_pkce();
377 let state = Self::generate_state();
378
379 let url = format!(
380 "{}?response_type=code&client_id={}&redirect_uri={}&scope={}&code_challenge={}&code_challenge_method=S256&state={}&id_token_add_organizations=true&codex_cli_simplified_flow=true&originator=codex_cli_rs",
381 AUTHORIZE_URL,
382 CLIENT_ID,
383 urlencoding::encode(REDIRECT_URI),
384 urlencoding::encode(SCOPE),
385 pkce.challenge,
386 state
387 );
388
389 (url, pkce.verifier, state)
390 }
391
392 pub fn oauth_issuer() -> &'static str {
393 AUTH_ISSUER
394 }
395
396 pub fn oauth_client_id() -> &'static str {
397 CLIENT_ID
398 }
399
400 fn responses_ws_url_with_base(base_url: &str) -> String {
401 base_url.to_string()
402 }
403
404 pub fn responses_ws_url() -> String {
405 Self::responses_ws_url_with_base(OPENAI_RESPONSES_WS_URL)
406 }
407
408 fn build_responses_ws_request_with_base_url_and_account_id(
409 base_url: &str,
410 token: &str,
411 chatgpt_account_id: Option<&str>,
412 ) -> Result<Request<()>> {
413 if token.trim().is_empty() {
414 anyhow::bail!("Responses WebSocket token cannot be empty");
415 }
416
417 let url = Self::responses_ws_url_with_base(base_url);
418 let mut request = url
419 .into_client_request()
420 .context("Failed to build Responses WebSocket request")?;
421 request.headers_mut().insert(
422 "Authorization",
423 HeaderValue::from_str(&format!("Bearer {token}"))
424 .context("Failed to build Responses Authorization header")?,
425 );
426 request.headers_mut().insert(
427 "User-Agent",
428 HeaderValue::from_static("codetether-responses-ws/1.0"),
429 );
430 if let Some(account_id) = chatgpt_account_id.filter(|id| !id.trim().is_empty()) {
431 request.headers_mut().insert(
432 "ChatGPT-Account-ID",
433 HeaderValue::from_str(account_id)
434 .context("Failed to build ChatGPT account header")?,
435 );
436 }
437 Ok(request)
438 }
439
440 fn build_responses_ws_request_with_base_url(
441 base_url: &str,
442 token: &str,
443 ) -> Result<Request<()>> {
444 Self::build_responses_ws_request_with_base_url_and_account_id(base_url, token, None)
445 }
446
447 fn build_responses_ws_request_with_account_id(
448 token: &str,
449 chatgpt_account_id: Option<&str>,
450 ) -> Result<Request<()>> {
451 Self::build_responses_ws_request_with_base_url_and_account_id(
452 OPENAI_RESPONSES_WS_URL,
453 token,
454 chatgpt_account_id,
455 )
456 }
457
458 fn build_chatgpt_responses_ws_request(
459 token: &str,
460 chatgpt_account_id: Option<&str>,
461 ) -> Result<Request<()>> {
462 Self::build_responses_ws_request_with_base_url_and_account_id(
463 CHATGPT_CODEX_RESPONSES_WS_URL,
464 token,
465 chatgpt_account_id,
466 )
467 }
468
469 async fn connect_responses_ws_with_token(
470 &self,
471 token: &str,
472 chatgpt_account_id: Option<&str>,
473 backend: ResponsesWsBackend,
474 ) -> Result<OpenAiRealtimeConnection> {
475 let request = match backend {
476 ResponsesWsBackend::OpenAi => {
477 Self::build_responses_ws_request_with_account_id(token, chatgpt_account_id)?
478 }
479 ResponsesWsBackend::ChatGptCodex => {
480 Self::build_chatgpt_responses_ws_request(token, chatgpt_account_id)?
481 }
482 };
483 let (stream, _response) = connect_async(request)
484 .await
485 .context("Failed to connect to OpenAI Responses WebSocket")?;
486 Ok(OpenAiRealtimeConnection::new(stream))
487 }
488
489 pub async fn connect_responses_ws(&self) -> Result<OpenAiRealtimeConnection> {
490 let token = self.get_access_token().await?;
491 let account_id = self.resolved_chatgpt_account_id(&token);
492 let backend = if self.using_chatgpt_backend() {
493 ResponsesWsBackend::ChatGptCodex
494 } else {
495 ResponsesWsBackend::OpenAi
496 };
497 self.connect_responses_ws_with_token(&token, account_id.as_deref(), backend)
498 .await
499 }
500
501 #[allow(dead_code)]
503 pub async fn exchange_code(code: &str, verifier: &str) -> Result<OAuthCredentials> {
504 Self::exchange_code_with_redirect_uri(code, verifier, REDIRECT_URI).await
505 }
506
507 pub async fn exchange_code_with_redirect_uri(
509 code: &str,
510 verifier: &str,
511 redirect_uri: &str,
512 ) -> Result<OAuthCredentials> {
513 let client = crate::provider::shared_http::shared_client().clone();
514 let form_body = format!(
515 "grant_type={}&client_id={}&code={}&code_verifier={}&redirect_uri={}",
516 urlencoding::encode("authorization_code"),
517 CLIENT_ID,
518 urlencoding::encode(code),
519 urlencoding::encode(verifier),
520 urlencoding::encode(redirect_uri),
521 );
522
523 let response = client
524 .post(TOKEN_URL)
525 .header("Content-Type", "application/x-www-form-urlencoded")
526 .body(form_body)
527 .send()
528 .await
529 .context("Failed to exchange authorization code")?;
530
531 if !response.status().is_success() {
532 let body = response.text().await.unwrap_or_default();
533 anyhow::bail!("OAuth token exchange failed: {}", body);
534 }
535
536 #[derive(Deserialize)]
537 struct TokenResponse {
538 #[serde(default)]
539 id_token: Option<String>,
540 access_token: String,
541 refresh_token: String,
542 expires_in: u64,
543 }
544
545 let tokens: TokenResponse = response
546 .json()
547 .await
548 .context("Failed to parse token response")?;
549
550 let expires_at = std::time::SystemTime::now()
551 .duration_since(std::time::UNIX_EPOCH)
552 .context("System time error")?
553 .as_secs()
554 + tokens.expires_in;
555
556 let chatgpt_account_id = tokens
557 .id_token
558 .as_deref()
559 .and_then(Self::extract_chatgpt_account_id_from_jwt);
560
561 Ok(OAuthCredentials {
562 id_token: tokens.id_token,
563 chatgpt_account_id,
564 access_token: tokens.access_token,
565 refresh_token: tokens.refresh_token,
566 expires_at,
567 })
568 }
569
570 pub async fn exchange_id_token_for_api_key(id_token: &str) -> Result<String> {
571 #[derive(Deserialize)]
572 struct ExchangeResponse {
573 access_token: String,
574 }
575
576 let client = crate::provider::shared_http::shared_client().clone();
577 let form_body = format!(
578 "grant_type={}&client_id={}&requested_token={}&subject_token={}&subject_token_type={}",
579 urlencoding::encode("urn:ietf:params:oauth:grant-type:token-exchange"),
580 urlencoding::encode(CLIENT_ID),
581 urlencoding::encode("openai-api-key"),
582 urlencoding::encode(id_token),
583 urlencoding::encode("urn:ietf:params:oauth:token-type:id_token"),
584 );
585
586 let response = client
587 .post(TOKEN_URL)
588 .header("Content-Type", "application/x-www-form-urlencoded")
589 .body(form_body)
590 .send()
591 .await
592 .context("Failed to exchange id_token for OpenAI API key")?;
593
594 let status = response.status();
595 if !status.is_success() {
596 let body = response.text().await.unwrap_or_default();
597 anyhow::bail!("API key token exchange failed ({}): {}", status, body);
598 }
599
600 let payload: ExchangeResponse = response
601 .json()
602 .await
603 .context("Failed to parse API key token exchange response")?;
604 if payload.access_token.trim().is_empty() {
605 anyhow::bail!("API key token exchange returned an empty access token");
606 }
607 Ok(payload.access_token)
608 }
609
610 async fn refresh_access_token(&self, refresh_token: &str) -> Result<OAuthCredentials> {
612 let form_body = format!(
613 "grant_type={}&refresh_token={}&client_id={}",
614 urlencoding::encode("refresh_token"),
615 urlencoding::encode(refresh_token),
616 CLIENT_ID,
617 );
618
619 let response = self
620 .client
621 .post(TOKEN_URL)
622 .header("Content-Type", "application/x-www-form-urlencoded")
623 .body(form_body)
624 .send()
625 .await
626 .context("Failed to refresh access token")?;
627
628 if !response.status().is_success() {
629 let body = response.text().await.unwrap_or_default();
630 anyhow::bail!("Token refresh failed: {}", body);
631 }
632
633 #[derive(Deserialize)]
634 struct TokenResponse {
635 access_token: String,
636 refresh_token: String,
637 expires_in: u64,
638 }
639
640 let tokens: TokenResponse = response
641 .json()
642 .await
643 .context("Failed to parse refresh response")?;
644
645 let expires_at = std::time::SystemTime::now()
646 .duration_since(std::time::UNIX_EPOCH)
647 .context("System time error")?
648 .as_secs()
649 + tokens.expires_in;
650
651 Ok(OAuthCredentials {
652 id_token: None,
653 chatgpt_account_id: self.chatgpt_account_id.clone(),
654 access_token: tokens.access_token,
655 refresh_token: tokens.refresh_token,
656 expires_at,
657 })
658 }
659
660 async fn get_access_token(&self) -> Result<String> {
662 if let Some(ref api_key) = self.static_api_key {
663 return Ok(api_key.clone());
664 }
665
666 {
667 let cache = self.cached_tokens.read().await;
668 if let Some(ref tokens) = *cache
669 && tokens
670 .expires_at
671 .duration_since(std::time::Instant::now())
672 .as_secs()
673 > 300
674 {
675 return Ok(tokens.access_token.clone());
676 }
677 }
678
679 let mut cache = self.cached_tokens.write().await;
680
681 let creds = if let Some(ref stored) = self.stored_credentials {
682 let now = std::time::SystemTime::now()
683 .duration_since(std::time::UNIX_EPOCH)
684 .context("System time error")?
685 .as_secs();
686
687 if stored.expires_at > now + 300 {
688 stored.clone()
689 } else {
690 self.refresh_access_token(&stored.refresh_token).await?
691 }
692 } else {
693 anyhow::bail!("No OAuth credentials available. Run OAuth flow first.");
694 };
695
696 let expires_in = creds.expires_at
697 - std::time::SystemTime::now()
698 .duration_since(std::time::UNIX_EPOCH)
699 .context("System time error")?
700 .as_secs();
701
702 let cached = CachedTokens {
703 access_token: creds.access_token.clone(),
704 refresh_token: creds.refresh_token.clone(),
705 expires_at: std::time::Instant::now() + std::time::Duration::from_secs(expires_in),
706 };
707
708 let token = cached.access_token.clone();
709 *cache = Some(cached);
710 Ok(token)
711 }
712
713 #[allow(dead_code)]
714 fn convert_messages(messages: &[Message]) -> Vec<Value> {
715 messages
716 .iter()
717 .map(|msg| {
718 let role = match msg.role {
719 Role::System => "system",
720 Role::User => "user",
721 Role::Assistant => "assistant",
722 Role::Tool => "tool",
723 };
724
725 match msg.role {
726 Role::Tool => {
727 if let Some(ContentPart::ToolResult {
728 tool_call_id,
729 content,
730 }) = msg.content.first()
731 {
732 json!({
733 "role": "tool",
734 "tool_call_id": tool_call_id,
735 "content": content
736 })
737 } else {
738 json!({ "role": role, "content": "" })
739 }
740 }
741 Role::Assistant => {
742 let text: String = msg
743 .content
744 .iter()
745 .filter_map(|p| match p {
746 ContentPart::Text { text } => Some(text.clone()),
747 _ => None,
748 })
749 .collect::<Vec<_>>()
750 .join("");
751
752 let tool_calls: Vec<Value> = msg
753 .content
754 .iter()
755 .filter_map(|p| match p {
756 ContentPart::ToolCall {
757 id,
758 name,
759 arguments,
760 ..
761 } => Some(json!({
762 "id": id,
763 "type": "function",
764 "function": {
765 "name": name,
766 "arguments": arguments
767 }
768 })),
769 _ => None,
770 })
771 .collect();
772
773 if tool_calls.is_empty() {
774 json!({ "role": "assistant", "content": text })
775 } else {
776 json!({
777 "role": "assistant",
778 "content": if text.is_empty() { Value::Null } else { json!(text) },
779 "tool_calls": tool_calls
780 })
781 }
782 }
783 _ => {
784 let text: String = msg
785 .content
786 .iter()
787 .filter_map(|p| match p {
788 ContentPart::Text { text } => Some(text.clone()),
789 _ => None,
790 })
791 .collect::<Vec<_>>()
792 .join("\n");
793 json!({ "role": role, "content": text })
794 }
795 }
796 })
797 .collect()
798 }
799
800 #[allow(dead_code)]
801 fn convert_tools(tools: &[ToolDefinition]) -> Vec<Value> {
802 tools
803 .iter()
804 .map(|t| {
805 json!({
806 "type": "function",
807 "function": {
808 "name": t.name,
809 "description": t.description,
810 "parameters": t.parameters
811 }
812 })
813 })
814 .collect()
815 }
816
817 fn convert_responses_tools(tools: &[ToolDefinition]) -> Vec<Value> {
818 tools
819 .iter()
820 .map(|t| {
821 json!({
822 "type": "function",
823 "name": t.name,
824 "description": t.description,
825 "strict": false,
826 "parameters": t.parameters
827 })
828 })
829 .collect()
830 }
831
832 fn extract_responses_instructions(messages: &[Message]) -> String {
833 let instructions = messages
834 .iter()
835 .filter(|msg| matches!(msg.role, Role::System))
836 .filter_map(|msg| {
837 let text = msg
838 .content
839 .iter()
840 .filter_map(|p| match p {
841 ContentPart::Text { text } => Some(text.clone()),
842 _ => None,
843 })
844 .collect::<Vec<_>>()
845 .join("\n");
846 (!text.is_empty()).then_some(text)
847 })
848 .collect::<Vec<_>>()
849 .join("\n\n");
850
851 if instructions.is_empty() {
852 DEFAULT_RESPONSES_INSTRUCTIONS.to_string()
853 } else {
854 instructions
855 }
856 }
857
858 fn convert_messages_to_responses_input(messages: &[Message]) -> Vec<Value> {
859 let mut input = Vec::new();
860 let mut known_tool_call_ids = std::collections::HashSet::new();
861
862 for msg in messages {
863 match msg.role {
864 Role::System => {}
865 Role::User => {
866 let text: String = msg
867 .content
868 .iter()
869 .filter_map(|p| match p {
870 ContentPart::Text { text } => Some(text.clone()),
871 _ => None,
872 })
873 .collect::<Vec<_>>()
874 .join("\n");
875 if !text.is_empty() {
876 input.push(json!({
877 "type": "message",
878 "role": "user",
879 "content": [{
880 "type": "input_text",
881 "text": text
882 }]
883 }));
884 }
885 }
886 Role::Assistant => {
887 let text: String = msg
888 .content
889 .iter()
890 .filter_map(|p| match p {
891 ContentPart::Text { text } => Some(text.clone()),
892 _ => None,
893 })
894 .collect::<Vec<_>>()
895 .join("");
896
897 if !text.is_empty() {
898 input.push(json!({
899 "type": "message",
900 "role": "assistant",
901 "content": [{
902 "type": "output_text",
903 "text": text
904 }]
905 }));
906 }
907
908 for part in &msg.content {
909 if let ContentPart::ToolCall {
910 id,
911 name,
912 arguments,
913 ..
914 } = part
915 {
916 known_tool_call_ids.insert(id.clone());
917 input.push(json!({
918 "type": "function_call",
919 "call_id": id,
920 "name": name,
921 "arguments": arguments,
922 }));
923 }
924 }
925 }
926 Role::Tool => {
927 for part in &msg.content {
928 if let ContentPart::ToolResult {
929 tool_call_id,
930 content,
931 } = part
932 {
933 if known_tool_call_ids.contains(tool_call_id) {
934 input.push(json!({
935 "type": "function_call_output",
936 "call_id": tool_call_id,
937 "output": content,
938 }));
939 } else {
940 tracing::warn!(
941 tool_call_id = %tool_call_id,
942 "Skipping orphaned function_call_output while building Codex responses input"
943 );
944 }
945 }
946 }
947 }
948 }
949 }
950
951 input
952 }
953
954 fn using_chatgpt_backend(&self) -> bool {
955 self.static_api_key.is_none()
956 }
957
958 fn extract_chatgpt_account_id_from_jwt(jwt: &str) -> Option<String> {
959 let payload_b64 = jwt.split('.').nth(1)?;
960 let payload = URL_SAFE_NO_PAD.decode(payload_b64).ok()?;
961 let value: Value = serde_json::from_slice(&payload).ok()?;
962
963 value
964 .get("https://api.openai.com/auth")
965 .and_then(Value::as_object)
966 .and_then(|auth| auth.get("chatgpt_account_id"))
967 .and_then(Value::as_str)
968 .map(|s| s.to_string())
969 .or_else(|| {
970 value
971 .get("chatgpt_account_id")
972 .and_then(Value::as_str)
973 .map(|s| s.to_string())
974 })
975 .or_else(|| {
976 value
977 .get("organizations")
978 .and_then(Value::as_array)
979 .and_then(|orgs| orgs.first())
980 .and_then(|org| org.get("id"))
981 .and_then(Value::as_str)
982 .map(|s| s.to_string())
983 })
984 }
985
986 pub fn extract_chatgpt_account_id(token: &str) -> Option<String> {
987 Self::extract_chatgpt_account_id_from_jwt(token)
988 }
989
990 fn resolved_chatgpt_account_id(&self, access_token: &str) -> Option<String> {
991 self.chatgpt_account_id
992 .clone()
993 .or_else(|| Self::extract_chatgpt_account_id_from_jwt(access_token))
994 .or_else(|| {
995 self.stored_credentials.as_ref().and_then(|creds| {
996 creds
997 .id_token
998 .as_deref()
999 .and_then(Self::extract_chatgpt_account_id_from_jwt)
1000 .or_else(|| creds.chatgpt_account_id.clone())
1001 })
1002 })
1003 }
1004
1005 fn parse_responses_usage(usage: Option<&Value>) -> Usage {
1006 let Some(usage) = usage else {
1007 return Usage::default();
1008 };
1009
1010 let prompt_tokens = usage
1011 .get("prompt_tokens")
1012 .or_else(|| usage.get("input_tokens"))
1013 .and_then(Value::as_u64)
1014 .unwrap_or(0) as usize;
1015
1016 let completion_tokens = usage
1017 .get("completion_tokens")
1018 .or_else(|| usage.get("output_tokens"))
1019 .and_then(Value::as_u64)
1020 .unwrap_or(0) as usize;
1021
1022 let total_tokens = usage
1023 .get("total_tokens")
1024 .and_then(Value::as_u64)
1025 .unwrap_or((prompt_tokens + completion_tokens) as u64)
1026 as usize;
1027
1028 let cached_tokens = usage
1035 .get("prompt_tokens_details")
1036 .or_else(|| usage.get("input_tokens_details"))
1037 .and_then(|d| d.get("cached_tokens"))
1038 .and_then(Value::as_u64)
1039 .unwrap_or(0) as usize;
1040
1041 Usage {
1042 prompt_tokens: prompt_tokens.saturating_sub(cached_tokens),
1043 completion_tokens,
1044 total_tokens,
1045 cache_read_tokens: if cached_tokens > 0 {
1046 Some(cached_tokens)
1047 } else {
1048 None
1049 },
1050 cache_write_tokens: None,
1051 }
1052 }
1053
1054 fn extract_json_string(value: Option<&Value>) -> Option<String> {
1055 match value {
1056 Some(Value::String(text)) => Some(text.clone()),
1057 Some(other) => serde_json::to_string(other).ok(),
1058 None => None,
1059 }
1060 }
1061
1062 #[allow(dead_code)]
1063 fn parse_responses_output_parts(output: &[Value]) -> (Vec<ContentPart>, bool) {
1064 let mut parts = Vec::new();
1065 let mut has_tool_calls = false;
1066
1067 for item in output {
1068 match item.get("type").and_then(Value::as_str) {
1069 Some("message") => {
1070 if let Some(content) = item.get("content").and_then(Value::as_array) {
1071 let text = content
1072 .iter()
1073 .filter(|segment| {
1074 matches!(
1075 segment.get("type").and_then(Value::as_str),
1076 Some("output_text") | Some("text")
1077 )
1078 })
1079 .filter_map(|segment| {
1080 segment
1081 .get("text")
1082 .and_then(Value::as_str)
1083 .map(str::to_string)
1084 })
1085 .collect::<Vec<_>>()
1086 .join("");
1087 if !text.is_empty() {
1088 parts.push(ContentPart::Text { text });
1089 }
1090 }
1091 }
1092 Some("function_call") => {
1093 let call_id = item
1094 .get("call_id")
1095 .or_else(|| item.get("id"))
1096 .and_then(Value::as_str);
1097 let name = item.get("name").and_then(Value::as_str);
1098 let arguments = item.get("arguments").and_then(Value::as_str);
1099 if let (Some(call_id), Some(name), Some(arguments)) = (call_id, name, arguments)
1100 {
1101 has_tool_calls = true;
1102 parts.push(ContentPart::ToolCall {
1103 id: call_id.to_string(),
1104 name: name.to_string(),
1105 arguments: arguments.to_string(),
1106 thought_signature: None,
1107 });
1108 }
1109 }
1110 _ => {}
1111 }
1112 }
1113
1114 (parts, has_tool_calls)
1115 }
1116
1117 fn parse_model_thinking_level(model: &str) -> (String, Option<ThinkingLevel>) {
1118 let Some((base_model, level_str)) = model.rsplit_once(':') else {
1119 return (model.to_string(), None);
1120 };
1121 let Some(level) = ThinkingLevel::parse(level_str) else {
1122 return (model.to_string(), None);
1123 };
1124 if base_model.trim().is_empty() {
1125 return (model.to_string(), None);
1126 }
1127 (base_model.to_string(), Some(level))
1128 }
1129
1130 fn env_thinking_level() -> Option<ThinkingLevel> {
1131 std::env::var(THINKING_LEVEL_ENV)
1132 .ok()
1133 .or_else(|| std::env::var(REASONING_EFFORT_ENV).ok())
1134 .as_deref()
1135 .and_then(ThinkingLevel::parse)
1136 }
1137
1138 fn model_supports_reasoning_effort(model: &str) -> bool {
1139 let normalized = model.to_ascii_lowercase();
1140 normalized.starts_with("gpt-5")
1141 || normalized.starts_with("o1")
1142 || normalized.starts_with("o3")
1143 || normalized.starts_with("o4")
1144 }
1145
1146 fn parse_service_tier_model_alias(model: &str) -> (String, Option<CodexServiceTier>) {
1147 match model {
1148 "gpt-5.4-fast" => ("gpt-5.4".to_string(), Some(CodexServiceTier::Priority)),
1150 _ => (model.to_string(), None),
1151 }
1152 }
1153
1154 fn resolve_model_and_reasoning_effort(model: &str) -> (String, Option<ThinkingLevel>) {
1155 let (base_model, level, _) =
1156 Self::resolve_model_and_reasoning_effort_and_service_tier(model);
1157 (base_model, level)
1158 }
1159
1160 fn resolve_model_and_reasoning_effort_and_service_tier(
1161 model: &str,
1162 ) -> (String, Option<ThinkingLevel>, Option<CodexServiceTier>) {
1163 let (base_model, level_from_model) = Self::parse_model_thinking_level(model);
1164 let level = level_from_model.or_else(Self::env_thinking_level);
1165 let (base_model, service_tier) = Self::parse_service_tier_model_alias(&base_model);
1166 if !Self::model_supports_reasoning_effort(&base_model) {
1167 return (base_model, None, service_tier);
1168 }
1169 (base_model, level, service_tier)
1170 }
1171
1172 fn apply_service_tier(payload: &mut Value, service_tier: Option<CodexServiceTier>) {
1173 if let Some(service_tier) = service_tier {
1174 payload["service_tier"] = json!(service_tier.as_str());
1175 }
1176 }
1177
1178 fn format_openai_api_error(status: StatusCode, body: &str, model: &str) -> String {
1179 if status == StatusCode::UNAUTHORIZED && body.contains("Missing scopes: model.request") {
1180 return format!(
1181 "OpenAI Codex OAuth token is missing required scope `model.request` for model `{model}`.\n\
1182 Re-run `codetether auth codex` and complete OAuth approval with a ChatGPT subscription account \
1183 that has model access in your org/project."
1184 );
1185 }
1186 if status == StatusCode::UNAUTHORIZED
1187 && (body.contains("chatgpt-account-id")
1188 || body.contains("ChatGPT-Account-ID")
1189 || body.contains("workspace"))
1190 {
1191 return "OpenAI Codex auth is missing a ChatGPT workspace/account identifier.\n\
1192 Re-run `codetether auth codex --device-code` and sign in to the intended workspace."
1193 .to_string();
1194 }
1195 format!("OpenAI API error ({status}): {body}")
1196 }
1197
1198 fn build_responses_ws_create_event(
1199 request: &CompletionRequest,
1200 model: &str,
1201 reasoning_effort: Option<ThinkingLevel>,
1202 service_tier: Option<CodexServiceTier>,
1203 ) -> Value {
1204 Self::build_responses_ws_create_event_for_backend(
1205 request,
1206 model,
1207 reasoning_effort,
1208 service_tier,
1209 ResponsesWsBackend::OpenAi,
1210 )
1211 }
1212
1213 fn build_responses_ws_create_event_for_backend(
1214 request: &CompletionRequest,
1215 model: &str,
1216 reasoning_effort: Option<ThinkingLevel>,
1217 service_tier: Option<CodexServiceTier>,
1218 backend: ResponsesWsBackend,
1219 ) -> Value {
1220 let instructions = Self::extract_responses_instructions(&request.messages);
1221 let input = Self::convert_messages_to_responses_input(&request.messages);
1222 let tools = Self::convert_responses_tools(&request.tools);
1223
1224 let mut event = json!({
1225 "type": "response.create",
1226 "model": model,
1227 "store": false,
1228 "instructions": instructions,
1229 "input": input,
1230 });
1231
1232 if !tools.is_empty() {
1233 event["tools"] = json!(tools);
1234 }
1235 if let Some(level) = reasoning_effort {
1236 event["reasoning"] = json!({ "effort": level.as_str() });
1237 }
1238 Self::apply_service_tier(&mut event, service_tier);
1239 if backend == ResponsesWsBackend::OpenAi {
1240 event["tool_choice"] = json!("auto");
1241 event["parallel_tool_calls"] = json!(true);
1242 }
1243 if backend == ResponsesWsBackend::OpenAi
1244 && let Some(max_tokens) = request.max_tokens
1245 {
1246 event["max_output_tokens"] = json!(max_tokens);
1247 }
1248
1249 event
1250 }
1251
1252 fn finish_responses_tool_call(
1253 parser: &mut ResponsesSseParser,
1254 key: &str,
1255 chunks: &mut Vec<StreamChunk>,
1256 ) {
1257 if let Some(state) = parser.tools.get_mut(key)
1258 && !state.finished
1259 {
1260 chunks.push(StreamChunk::ToolCallEnd {
1261 id: state.call_id.clone(),
1262 });
1263 state.finished = true;
1264 }
1265 }
1266
1267 fn parse_responses_event(
1268 parser: &mut ResponsesSseParser,
1269 event: &Value,
1270 chunks: &mut Vec<StreamChunk>,
1271 ) {
1272 match event.get("type").and_then(Value::as_str) {
1273 Some("response.output_text.delta") => {
1274 if let Some(delta) = event.get("delta").and_then(Value::as_str) {
1275 chunks.push(StreamChunk::Text(delta.to_string()));
1276 }
1277 }
1278 Some("response.output_item.added") => {
1279 if let Some(item) = event.get("item") {
1280 Self::record_responses_tool_item(parser, item, chunks, false);
1281 }
1282 }
1283 Some("response.function_call_arguments.delta") => {
1284 Self::record_responses_tool_arguments(parser, &event, chunks, false);
1285 }
1286 Some("response.function_call_arguments.done") => {
1287 Self::record_responses_tool_arguments(parser, &event, chunks, true);
1288 }
1289 Some("response.output_item.done") => {
1290 if let Some(item) = event.get("item")
1291 && item.get("type").and_then(Value::as_str) == Some("function_call")
1292 && let Some(key) = item
1293 .get("id")
1294 .and_then(Value::as_str)
1295 .or_else(|| item.get("call_id").and_then(Value::as_str))
1296 {
1297 Self::record_responses_tool_item(parser, item, chunks, true);
1298 Self::finish_responses_tool_call(parser, key, chunks);
1299 }
1300 }
1301 Some("response.completed") | Some("response.done") => {
1302 if let Some(output) = event
1303 .get("response")
1304 .and_then(|response| response.get("output"))
1305 .and_then(Value::as_array)
1306 {
1307 for item in output {
1308 if item.get("type").and_then(Value::as_str) == Some("function_call")
1309 && let Some(key) = item
1310 .get("id")
1311 .and_then(Value::as_str)
1312 .or_else(|| item.get("call_id").and_then(Value::as_str))
1313 {
1314 Self::record_responses_tool_item(parser, item, chunks, true);
1315 Self::finish_responses_tool_call(parser, key, chunks);
1316 }
1317 }
1318 }
1319 let failed_message = event
1320 .get("response")
1321 .and_then(|response| response.get("status"))
1322 .and_then(Value::as_str)
1323 .filter(|status| matches!(*status, "failed" | "cancelled" | "incomplete"))
1324 .map(|_| {
1325 event
1326 .get("response")
1327 .and_then(|response| response.get("error"))
1328 .and_then(|error| error.get("message"))
1329 .and_then(Value::as_str)
1330 .unwrap_or("Response failed")
1331 .to_string()
1332 });
1333 if let Some(message) = failed_message {
1334 chunks.push(StreamChunk::Error(message));
1335 return;
1336 }
1337 let usage = event
1338 .get("response")
1339 .and_then(|response| response.get("usage"))
1340 .map(|usage| Self::parse_responses_usage(Some(usage)));
1341 chunks.push(StreamChunk::Done { usage });
1342 }
1343 Some("response.failed") => {
1344 let message = event
1345 .get("response")
1346 .and_then(|response| response.get("error"))
1347 .and_then(|error| error.get("message"))
1348 .or_else(|| event.get("error").and_then(|error| error.get("message")))
1349 .and_then(Value::as_str)
1350 .unwrap_or("Response failed")
1351 .to_string();
1352 chunks.push(StreamChunk::Error(message));
1353 }
1354 Some("error") => {
1355 let message = event
1356 .get("error")
1357 .and_then(|error| error.get("message"))
1358 .or_else(|| event.get("message"))
1359 .and_then(Value::as_str)
1360 .unwrap_or("Realtime error")
1361 .to_string();
1362 chunks.push(StreamChunk::Error(message));
1363 }
1364 _ => {}
1365 }
1366 }
1367
1368 fn parse_responses_sse_event(
1369 parser: &mut ResponsesSseParser,
1370 data: &str,
1371 chunks: &mut Vec<StreamChunk>,
1372 ) {
1373 if data == "[DONE]" {
1374 chunks.push(StreamChunk::Done { usage: None });
1375 return;
1376 }
1377
1378 let Ok(event): Result<Value, _> = serde_json::from_str(data) else {
1379 return;
1380 };
1381
1382 Self::parse_responses_event(parser, &event, chunks);
1383 }
1384
1385 fn record_responses_tool_item(
1386 parser: &mut ResponsesSseParser,
1387 item: &Value,
1388 chunks: &mut Vec<StreamChunk>,
1389 include_arguments: bool,
1390 ) -> Option<String> {
1391 if item.get("type").and_then(Value::as_str) != Some("function_call") {
1392 return None;
1393 }
1394
1395 let key = item
1396 .get("id")
1397 .and_then(Value::as_str)
1398 .or_else(|| item.get("call_id").and_then(Value::as_str))?;
1399 let call_id = item
1400 .get("call_id")
1401 .and_then(Value::as_str)
1402 .or_else(|| item.get("id").and_then(Value::as_str))?
1403 .to_string();
1404 let name = item.get("name").and_then(Value::as_str).map(str::to_string);
1405 let arguments = if include_arguments {
1406 Self::extract_json_string(item.get("arguments"))
1407 } else {
1408 None
1409 };
1410
1411 let state = parser
1412 .tools
1413 .entry(key.to_string())
1414 .or_insert_with(|| ResponsesToolState {
1415 call_id: call_id.clone(),
1416 ..ResponsesToolState::default()
1417 });
1418 if state.call_id.is_empty() {
1419 state.call_id = call_id.clone();
1420 }
1421 if state.name.is_none() {
1422 state.name = name;
1423 }
1424 if !state.started
1425 && let Some(name) = state.name.clone()
1426 {
1427 chunks.push(StreamChunk::ToolCallStart {
1428 id: state.call_id.clone(),
1429 name,
1430 });
1431 state.started = true;
1432 }
1433 if let Some(arguments) = arguments {
1434 Self::emit_missing_responses_tool_arguments(state, arguments, chunks);
1435 }
1436
1437 Some(state.call_id.clone())
1438 }
1439
1440 fn record_responses_tool_arguments(
1441 parser: &mut ResponsesSseParser,
1442 event: &Value,
1443 chunks: &mut Vec<StreamChunk>,
1444 use_final_arguments: bool,
1445 ) {
1446 let key = event
1447 .get("item_id")
1448 .and_then(Value::as_str)
1449 .or_else(|| event.get("call_id").and_then(Value::as_str));
1450 let Some(key) = key else {
1451 return;
1452 };
1453
1454 let fallback_call_id = event
1455 .get("call_id")
1456 .and_then(Value::as_str)
1457 .unwrap_or(key)
1458 .to_string();
1459 let state = parser
1460 .tools
1461 .entry(key.to_string())
1462 .or_insert_with(|| ResponsesToolState {
1463 call_id: fallback_call_id.clone(),
1464 ..ResponsesToolState::default()
1465 });
1466 if state.call_id.is_empty() {
1467 state.call_id = fallback_call_id;
1468 }
1469
1470 if !state.started
1471 && let Some(name) = event.get("name").and_then(Value::as_str)
1472 {
1473 state.name = Some(name.to_string());
1474 chunks.push(StreamChunk::ToolCallStart {
1475 id: state.call_id.clone(),
1476 name: name.to_string(),
1477 });
1478 state.started = true;
1479 }
1480
1481 let arguments = if use_final_arguments {
1482 Self::extract_json_string(event.get("arguments")).or_else(|| {
1483 event
1484 .get("delta")
1485 .and_then(Value::as_str)
1486 .map(str::to_string)
1487 })
1488 } else {
1489 event
1490 .get("delta")
1491 .and_then(Value::as_str)
1492 .map(str::to_string)
1493 };
1494
1495 if let Some(arguments) = arguments {
1496 Self::emit_missing_responses_tool_arguments(state, arguments, chunks);
1497 }
1498 }
1499
1500 fn emit_missing_responses_tool_arguments(
1501 state: &mut ResponsesToolState,
1502 arguments: String,
1503 chunks: &mut Vec<StreamChunk>,
1504 ) {
1505 let delta = if arguments.starts_with(&state.emitted_arguments) {
1506 arguments[state.emitted_arguments.len()..].to_string()
1507 } else if state.emitted_arguments.is_empty() {
1508 arguments.clone()
1509 } else if arguments == state.emitted_arguments {
1510 String::new()
1511 } else {
1512 arguments.clone()
1513 };
1514
1515 if !delta.is_empty() {
1516 chunks.push(StreamChunk::ToolCallDelta {
1517 id: state.call_id.clone(),
1518 arguments_delta: delta.clone(),
1519 });
1520 state.emitted_arguments.push_str(&delta);
1521 }
1522 }
1523
1524 fn parse_responses_sse_bytes(
1525 parser: &mut ResponsesSseParser,
1526 bytes: &[u8],
1527 ) -> Vec<StreamChunk> {
1528 parser.line_buffer.push_str(&String::from_utf8_lossy(bytes));
1529 let mut chunks = Vec::new();
1530
1531 while let Some(line_end) = parser.line_buffer.find('\n') {
1532 let mut line = parser.line_buffer[..line_end].to_string();
1533 parser.line_buffer.drain(..=line_end);
1534
1535 if line.ends_with('\r') {
1536 line.pop();
1537 }
1538
1539 if line.is_empty() {
1540 if !parser.event_data_lines.is_empty() {
1541 let data = parser.event_data_lines.join("\n");
1542 parser.event_data_lines.clear();
1543 Self::parse_responses_sse_event(parser, &data, &mut chunks);
1544 }
1545 continue;
1546 }
1547
1548 if let Some(data) = line.strip_prefix("data:") {
1549 let data = data.strip_prefix(' ').unwrap_or(data);
1550 parser.event_data_lines.push(data.to_string());
1551 }
1552 }
1553
1554 chunks
1555 }
1556
1557 fn finish_responses_sse_parser(parser: &mut ResponsesSseParser) -> Vec<StreamChunk> {
1558 let mut chunks = Vec::new();
1559
1560 if !parser.line_buffer.is_empty() {
1561 let mut line = std::mem::take(&mut parser.line_buffer);
1562 if line.ends_with('\r') {
1563 line.pop();
1564 }
1565 if let Some(data) = line.strip_prefix("data:") {
1566 let data = data.strip_prefix(' ').unwrap_or(data);
1567 parser.event_data_lines.push(data.to_string());
1568 }
1569 }
1570
1571 if !parser.event_data_lines.is_empty() {
1572 let data = parser.event_data_lines.join("\n");
1573 parser.event_data_lines.clear();
1574 Self::parse_responses_sse_event(parser, &data, &mut chunks);
1575 }
1576
1577 chunks
1578 }
1579
1580 async fn complete_with_chatgpt_responses(
1581 &self,
1582 request: CompletionRequest,
1583 access_token: String,
1584 ) -> Result<CompletionResponse> {
1585 let stream = self
1586 .complete_stream_with_chatgpt_responses(request, access_token)
1587 .await?;
1588 Self::collect_stream_completion(stream).await
1589 }
1590
1591 async fn complete_with_openai_responses(
1592 &self,
1593 request: CompletionRequest,
1594 api_key: String,
1595 ) -> Result<CompletionResponse> {
1596 let stream = self
1597 .complete_stream_with_openai_responses(request, api_key)
1598 .await?;
1599 Self::collect_stream_completion(stream).await
1600 }
1601
1602 async fn collect_stream_completion(
1603 mut stream: BoxStream<'static, StreamChunk>,
1604 ) -> Result<CompletionResponse> {
1605 #[derive(Default)]
1606 struct ToolAccumulator {
1607 id: String,
1608 name: String,
1609 arguments: String,
1610 }
1611
1612 let mut text = String::new();
1613 let mut tools = Vec::<ToolAccumulator>::new();
1614 let mut tool_index_by_id = HashMap::<String, usize>::new();
1615 let mut usage = Usage::default();
1616
1617 while let Some(chunk) = stream.next().await {
1618 match chunk {
1619 StreamChunk::Text(delta) => text.push_str(&delta),
1620 StreamChunk::ToolCallStart { id, name } => {
1621 let next_idx = tools.len();
1622 let idx = *tool_index_by_id.entry(id.clone()).or_insert(next_idx);
1623 if idx == next_idx {
1624 tools.push(ToolAccumulator {
1625 id,
1626 name,
1627 arguments: String::new(),
1628 });
1629 } else if tools[idx].name == "tool" {
1630 tools[idx].name = name;
1631 }
1632 }
1633 StreamChunk::ToolCallDelta {
1634 id,
1635 arguments_delta,
1636 } => {
1637 if let Some(idx) = tool_index_by_id.get(&id).copied() {
1638 tools[idx].arguments.push_str(&arguments_delta);
1639 } else {
1640 let idx = tools.len();
1641 tool_index_by_id.insert(id.clone(), idx);
1642 tools.push(ToolAccumulator {
1643 id,
1644 name: "tool".to_string(),
1645 arguments: arguments_delta,
1646 });
1647 }
1648 }
1649 StreamChunk::ToolCallEnd { .. } => {}
1650 StreamChunk::Done { usage: done_usage } => {
1651 if let Some(done_usage) = done_usage {
1652 usage = done_usage;
1653 }
1654 }
1655 StreamChunk::Error(message) => anyhow::bail!(message),
1656 }
1657 }
1658
1659 let mut content = Vec::new();
1660 if !text.is_empty() {
1661 content.push(ContentPart::Text { text });
1662 }
1663 for tool in tools {
1664 content.push(ContentPart::ToolCall {
1665 id: tool.id,
1666 name: tool.name,
1667 arguments: tool.arguments,
1668 thought_signature: None,
1669 });
1670 }
1671
1672 let finish_reason = if content
1673 .iter()
1674 .any(|part| matches!(part, ContentPart::ToolCall { .. }))
1675 {
1676 FinishReason::ToolCalls
1677 } else {
1678 FinishReason::Stop
1679 };
1680
1681 Ok(CompletionResponse {
1682 message: Message {
1683 role: Role::Assistant,
1684 content,
1685 },
1686 usage,
1687 finish_reason,
1688 })
1689 }
1690
1691 async fn complete_stream_with_chatgpt_responses(
1692 &self,
1693 request: CompletionRequest,
1694 access_token: String,
1695 ) -> Result<BoxStream<'static, StreamChunk>> {
1696 let account_id = self.resolved_chatgpt_account_id(&access_token).context(
1697 "OpenAI Codex OAuth token is missing ChatGPT workspace/account ID. Re-run `codetether auth codex --device-code`.",
1698 )?;
1699 match self
1700 .complete_stream_with_realtime(
1701 request.clone(),
1702 access_token.clone(),
1703 Some(account_id.clone()),
1704 "chatgpt-codex-responses-ws",
1705 ResponsesWsBackend::ChatGptCodex,
1706 )
1707 .await
1708 {
1709 Ok(stream) => Ok(stream),
1710 Err(error) => {
1711 tracing::warn!(
1712 error = %error,
1713 "Responses WebSocket connect failed for ChatGPT Codex backend; falling back to HTTP responses streaming"
1714 );
1715 self.complete_stream_with_chatgpt_http_responses(request, access_token, account_id)
1716 .await
1717 }
1718 }
1719 }
1720
1721 async fn complete_stream_with_openai_responses(
1722 &self,
1723 request: CompletionRequest,
1724 api_key: String,
1725 ) -> Result<BoxStream<'static, StreamChunk>> {
1726 match self
1727 .complete_stream_with_realtime(
1728 request.clone(),
1729 api_key.clone(),
1730 None,
1731 "openai-responses-ws",
1732 ResponsesWsBackend::OpenAi,
1733 )
1734 .await
1735 {
1736 Ok(stream) => Ok(stream),
1737 Err(error) => {
1738 tracing::warn!(
1739 error = %error,
1740 "Responses WebSocket connect failed for OpenAI backend; falling back to HTTP responses streaming"
1741 );
1742 self.complete_stream_with_openai_http_responses(request, api_key)
1743 .await
1744 }
1745 }
1746 }
1747
1748 async fn complete_stream_with_chatgpt_http_responses(
1749 &self,
1750 request: CompletionRequest,
1751 access_token: String,
1752 account_id: String,
1753 ) -> Result<BoxStream<'static, StreamChunk>> {
1754 let (model, reasoning_effort, service_tier) =
1755 Self::resolve_model_and_reasoning_effort_and_service_tier(&request.model);
1756 let instructions = Self::extract_responses_instructions(&request.messages);
1757 let input = Self::convert_messages_to_responses_input(&request.messages);
1758 let tools = Self::convert_responses_tools(&request.tools);
1759
1760 let mut body = json!({
1761 "model": model,
1762 "instructions": instructions,
1763 "input": input,
1764 "stream": true,
1765 "store": false,
1766 "tool_choice": "auto",
1767 "parallel_tool_calls": true,
1768 });
1769
1770 if !tools.is_empty() {
1771 body["tools"] = json!(tools);
1772 }
1773 if let Some(level) = reasoning_effort {
1774 body["reasoning"] = json!({ "effort": level.as_str() });
1775 }
1776 Self::apply_service_tier(&mut body, service_tier);
1777
1778 tracing::info!(
1779 backend = "chatgpt-codex-responses-http",
1780 instructions_len = body
1781 .get("instructions")
1782 .and_then(|v| v.as_str())
1783 .map(str::len)
1784 .unwrap_or(0),
1785 input_items = body
1786 .get("input")
1787 .and_then(|v| v.as_array())
1788 .map(Vec::len)
1789 .unwrap_or(0),
1790 has_tools = !tools.is_empty(),
1791 "Sending HTTP responses request"
1792 );
1793
1794 let response = self
1795 .client
1796 .post(format!("{}/responses", CHATGPT_CODEX_API_URL))
1797 .header("Authorization", format!("Bearer {}", access_token))
1798 .header("chatgpt-account-id", account_id)
1799 .header("Content-Type", "application/json")
1800 .json(&body)
1801 .send()
1802 .await
1803 .context("Failed to send streaming request to ChatGPT Codex backend")?;
1804
1805 let status = response.status();
1806 if !status.is_success() {
1807 let body = response.text().await.unwrap_or_default();
1808 anyhow::bail!(Self::format_openai_api_error(status, &body, &request.model));
1809 }
1810
1811 let stream = async_stream::stream! {
1812 let mut parser = ResponsesSseParser::default();
1813 let mut byte_stream = response.bytes_stream();
1814
1815 while let Some(result) = byte_stream.next().await {
1816 match result {
1817 Ok(bytes) => {
1818 for chunk in Self::parse_responses_sse_bytes(&mut parser, &bytes) {
1819 yield chunk;
1820 }
1821 }
1822 Err(error) => yield StreamChunk::Error(error.to_string()),
1823 }
1824 }
1825
1826 for chunk in Self::finish_responses_sse_parser(&mut parser) {
1827 yield chunk;
1828 }
1829 };
1830
1831 Ok(Box::pin(stream))
1832 }
1833
1834 async fn complete_stream_with_openai_http_responses(
1835 &self,
1836 request: CompletionRequest,
1837 api_key: String,
1838 ) -> Result<BoxStream<'static, StreamChunk>> {
1839 let (model, reasoning_effort, service_tier) =
1840 Self::resolve_model_and_reasoning_effort_and_service_tier(&request.model);
1841 let instructions = Self::extract_responses_instructions(&request.messages);
1842 let input = Self::convert_messages_to_responses_input(&request.messages);
1843 let tools = Self::convert_responses_tools(&request.tools);
1844
1845 let mut body = json!({
1846 "model": model,
1847 "instructions": instructions,
1848 "input": input,
1849 "stream": true,
1850 "store": false,
1851 "tool_choice": "auto",
1852 "parallel_tool_calls": true,
1853 });
1854
1855 if !tools.is_empty() {
1856 body["tools"] = json!(tools);
1857 }
1858 if let Some(level) = reasoning_effort {
1859 body["reasoning"] = json!({ "effort": level.as_str() });
1860 }
1861 Self::apply_service_tier(&mut body, service_tier);
1862
1863 tracing::info!(
1864 backend = "openai-responses-http",
1865 instructions_len = body
1866 .get("instructions")
1867 .and_then(|v| v.as_str())
1868 .map(str::len)
1869 .unwrap_or(0),
1870 input_items = body
1871 .get("input")
1872 .and_then(|v| v.as_array())
1873 .map(Vec::len)
1874 .unwrap_or(0),
1875 has_tools = !tools.is_empty(),
1876 "Sending HTTP responses request"
1877 );
1878
1879 let response = self
1880 .client
1881 .post(format!("{}/responses", OPENAI_API_URL))
1882 .header("Authorization", format!("Bearer {}", api_key))
1883 .header("Content-Type", "application/json")
1884 .json(&body)
1885 .send()
1886 .await
1887 .context("Failed to send streaming request to OpenAI responses API")?;
1888
1889 let status = response.status();
1890 if !status.is_success() {
1891 let body = response.text().await.unwrap_or_default();
1892 anyhow::bail!(Self::format_openai_api_error(status, &body, &request.model));
1893 }
1894
1895 let stream = async_stream::stream! {
1896 let mut parser = ResponsesSseParser::default();
1897 let mut byte_stream = response.bytes_stream();
1898
1899 while let Some(result) = byte_stream.next().await {
1900 match result {
1901 Ok(bytes) => {
1902 for chunk in Self::parse_responses_sse_bytes(&mut parser, &bytes) {
1903 yield chunk;
1904 }
1905 }
1906 Err(error) => yield StreamChunk::Error(error.to_string()),
1907 }
1908 }
1909
1910 for chunk in Self::finish_responses_sse_parser(&mut parser) {
1911 yield chunk;
1912 }
1913 };
1914
1915 Ok(Box::pin(stream))
1916 }
1917
1918 async fn complete_stream_with_realtime(
1919 &self,
1920 request: CompletionRequest,
1921 access_token: String,
1922 chatgpt_account_id: Option<String>,
1923 backend: &'static str,
1924 ws_backend: ResponsesWsBackend,
1925 ) -> Result<BoxStream<'static, StreamChunk>> {
1926 let (model, reasoning_effort, service_tier) =
1927 Self::resolve_model_and_reasoning_effort_and_service_tier(&request.model);
1928 let body = Self::build_responses_ws_create_event_for_backend(
1929 &request,
1930 &model,
1931 reasoning_effort,
1932 service_tier,
1933 ws_backend,
1934 );
1935 tracing::info!(
1936 backend = backend,
1937 instructions_len = body
1938 .get("instructions")
1939 .and_then(|v| v.as_str())
1940 .map(str::len)
1941 .unwrap_or(0),
1942 input_items = body
1943 .get("input")
1944 .and_then(|v| v.as_array())
1945 .map(Vec::len)
1946 .unwrap_or(0),
1947 has_tools = body
1948 .get("tools")
1949 .and_then(|v| v.as_array())
1950 .is_some_and(|tools| !tools.is_empty()),
1951 "Sending responses websocket request"
1952 );
1953
1954 let connection = self
1955 .connect_responses_ws_with_token(
1956 &access_token,
1957 chatgpt_account_id.as_deref(),
1958 ws_backend,
1959 )
1960 .await?;
1961
1962 let stream = async_stream::stream! {
1963 let mut connection = connection;
1964 let mut parser = ResponsesSseParser::default();
1965 if let Err(error) = connection.send_event(&body).await {
1966 yield StreamChunk::Error(error.to_string());
1967 let _ = connection.close().await;
1968 return;
1969 }
1970
1971 let mut saw_terminal = false;
1972 loop {
1973 match connection.recv_event().await {
1974 Ok(Some(event)) => {
1975 let mut chunks = Vec::new();
1976 Self::parse_responses_event(&mut parser, &event, &mut chunks);
1977 for chunk in chunks {
1978 if matches!(chunk, StreamChunk::Done { .. } | StreamChunk::Error(_)) {
1979 saw_terminal = true;
1980 }
1981 yield chunk;
1982 }
1983 if saw_terminal {
1984 break;
1985 }
1986 }
1987 Ok(None) => {
1988 if !saw_terminal {
1989 yield StreamChunk::Error("Realtime WebSocket closed before response completion".to_string());
1990 }
1991 break;
1992 }
1993 Err(error) => {
1994 yield StreamChunk::Error(error.to_string());
1995 break;
1996 }
1997 }
1998 }
1999
2000 let _ = connection.close().await;
2001 };
2002
2003 Ok(Box::pin(stream))
2004 }
2005}
2006
2007#[async_trait]
2008impl Provider for OpenAiCodexProvider {
2009 fn name(&self) -> &str {
2010 "openai-codex"
2011 }
2012
2013 async fn list_models(&self) -> Result<Vec<ModelInfo>> {
2014 let mut models = vec![
2015 ModelInfo {
2016 id: "gpt-5".to_string(),
2017 name: "GPT-5".to_string(),
2018 provider: "openai-codex".to_string(),
2019 context_window: 400_000,
2020 max_output_tokens: Some(128_000),
2021 supports_vision: false,
2022 supports_tools: true,
2023 supports_streaming: true,
2024 input_cost_per_million: Some(0.0),
2025 output_cost_per_million: Some(0.0),
2026 },
2027 ModelInfo {
2028 id: "gpt-5-mini".to_string(),
2029 name: "GPT-5 Mini".to_string(),
2030 provider: "openai-codex".to_string(),
2031 context_window: 264_000,
2032 max_output_tokens: Some(64_000),
2033 supports_vision: false,
2034 supports_tools: true,
2035 supports_streaming: true,
2036 input_cost_per_million: Some(0.0),
2037 output_cost_per_million: Some(0.0),
2038 },
2039 ModelInfo {
2040 id: "gpt-5.1-codex".to_string(),
2041 name: "GPT-5.1 Codex".to_string(),
2042 provider: "openai-codex".to_string(),
2043 context_window: 400_000,
2044 max_output_tokens: Some(128_000),
2045 supports_vision: false,
2046 supports_tools: true,
2047 supports_streaming: true,
2048 input_cost_per_million: Some(0.0),
2049 output_cost_per_million: Some(0.0),
2050 },
2051 ModelInfo {
2052 id: "gpt-5.2".to_string(),
2053 name: "GPT-5.2".to_string(),
2054 provider: "openai-codex".to_string(),
2055 context_window: 400_000,
2056 max_output_tokens: Some(128_000),
2057 supports_vision: false,
2058 supports_tools: true,
2059 supports_streaming: true,
2060 input_cost_per_million: Some(0.0),
2061 output_cost_per_million: Some(0.0),
2062 },
2063 ModelInfo {
2064 id: "gpt-5.3-codex".to_string(),
2065 name: "GPT-5.3 Codex".to_string(),
2066 provider: "openai-codex".to_string(),
2067 context_window: 400_000,
2068 max_output_tokens: Some(128_000),
2069 supports_vision: false,
2070 supports_tools: true,
2071 supports_streaming: true,
2072 input_cost_per_million: Some(0.0),
2073 output_cost_per_million: Some(0.0),
2074 },
2075 ModelInfo {
2076 id: "gpt-5.4".to_string(),
2077 name: "GPT-5.4".to_string(),
2078 provider: "openai-codex".to_string(),
2079 context_window: 272_000,
2080 max_output_tokens: Some(128_000),
2081 supports_vision: false,
2082 supports_tools: true,
2083 supports_streaming: true,
2084 input_cost_per_million: Some(0.0),
2085 output_cost_per_million: Some(0.0),
2086 },
2087 ModelInfo {
2088 id: "gpt-5.4-fast".to_string(),
2089 name: "GPT-5.4 Fast".to_string(),
2090 provider: "openai-codex".to_string(),
2091 context_window: 272_000,
2092 max_output_tokens: Some(128_000),
2093 supports_vision: false,
2094 supports_tools: true,
2095 supports_streaming: true,
2096 input_cost_per_million: Some(0.0),
2097 output_cost_per_million: Some(0.0),
2098 },
2099 ModelInfo {
2100 id: "gpt-5.4-pro".to_string(),
2101 name: "GPT-5.4 Pro".to_string(),
2102 provider: "openai-codex".to_string(),
2103 context_window: 272_000,
2104 max_output_tokens: Some(128_000),
2105 supports_vision: false,
2106 supports_tools: true,
2107 supports_streaming: true,
2108 input_cost_per_million: Some(0.0),
2109 output_cost_per_million: Some(0.0),
2110 },
2111 ModelInfo {
2112 id: "o3".to_string(),
2113 name: "O3".to_string(),
2114 provider: "openai-codex".to_string(),
2115 context_window: 200_000,
2116 max_output_tokens: Some(100_000),
2117 supports_vision: true,
2118 supports_tools: true,
2119 supports_streaming: true,
2120 input_cost_per_million: Some(0.0),
2121 output_cost_per_million: Some(0.0),
2122 },
2123 ModelInfo {
2124 id: "o4-mini".to_string(),
2125 name: "O4 Mini".to_string(),
2126 provider: "openai-codex".to_string(),
2127 context_window: 200_000,
2128 max_output_tokens: Some(100_000),
2129 supports_vision: true,
2130 supports_tools: true,
2131 supports_streaming: true,
2132 input_cost_per_million: Some(0.0),
2133 output_cost_per_million: Some(0.0),
2134 },
2135 ];
2136
2137 if self.using_chatgpt_backend() {
2138 models.retain(|model| Self::chatgpt_supported_models().contains(&model.id.as_str()));
2139 }
2140
2141 Ok(models)
2142 }
2143
2144 async fn complete(&self, request: CompletionRequest) -> Result<CompletionResponse> {
2145 self.validate_model_for_backend(&request.model)?;
2146 let access_token = self.get_access_token().await?;
2147 if self.using_chatgpt_backend() {
2148 return self
2149 .complete_with_chatgpt_responses(request, access_token)
2150 .await;
2151 }
2152 self.complete_with_openai_responses(request, access_token)
2153 .await
2154 }
2155
2156 async fn complete_stream(
2157 &self,
2158 request: CompletionRequest,
2159 ) -> Result<BoxStream<'static, StreamChunk>> {
2160 self.validate_model_for_backend(&request.model)?;
2161 let access_token = self.get_access_token().await?;
2162 if self.using_chatgpt_backend() {
2163 return self
2164 .complete_stream_with_chatgpt_responses(request, access_token)
2165 .await;
2166 }
2167 self.complete_stream_with_openai_responses(request, access_token)
2168 .await
2169 }
2170}
2171
2172#[cfg(test)]
2173mod tests {
2174 use super::*;
2175 use futures::stream;
2176 use tokio::io::duplex;
2177 use tokio_tungstenite::{
2178 accept_hdr_async, client_async,
2179 tungstenite::{
2180 Message as WsMessage,
2181 handshake::server::{Request as ServerRequest, Response as ServerResponse},
2182 },
2183 };
2184
2185 #[test]
2186 fn test_generate_pkce() {
2187 let pkce = OpenAiCodexProvider::generate_pkce();
2188 assert!(!pkce.verifier.is_empty());
2189 assert!(!pkce.challenge.is_empty());
2190 assert_ne!(pkce.verifier, pkce.challenge);
2191 }
2192
2193 #[test]
2194 fn test_generate_state() {
2195 let state = OpenAiCodexProvider::generate_state();
2196 assert_eq!(state.len(), 32);
2197 }
2198
2199 #[test]
2200 fn formats_scope_error_with_actionable_message() {
2201 let body = r#"{"error":{"message":"Missing scopes: model.request"}}"#;
2202 let msg = OpenAiCodexProvider::format_openai_api_error(
2203 StatusCode::UNAUTHORIZED,
2204 body,
2205 "gpt-5.3-codex",
2206 );
2207 assert!(msg.contains("model.request"));
2208 assert!(msg.contains("codetether auth codex"));
2209 }
2210
2211 #[test]
2212 fn parses_model_suffix_for_thinking_level() {
2213 let (model, level) =
2214 OpenAiCodexProvider::resolve_model_and_reasoning_effort("gpt-5.3-codex:high");
2215 assert_eq!(model, "gpt-5.3-codex");
2216 assert_eq!(level.map(ThinkingLevel::as_str), Some("high"));
2217 }
2218
2219 #[test]
2220 fn maps_fast_model_alias_to_priority_service_tier() {
2221 let (model, level, service_tier) =
2222 OpenAiCodexProvider::resolve_model_and_reasoning_effort_and_service_tier(
2223 "gpt-5.4-fast:high",
2224 );
2225 assert_eq!(model, "gpt-5.4");
2226 assert_eq!(level.map(ThinkingLevel::as_str), Some("high"));
2227 assert_eq!(service_tier.map(CodexServiceTier::as_str), Some("priority"));
2228 }
2229
2230 #[test]
2231 fn ignores_unknown_model_suffix() {
2232 let (model, level) =
2233 OpenAiCodexProvider::resolve_model_and_reasoning_effort("gpt-5.3-codex:turbo");
2234 assert_eq!(model, "gpt-5.3-codex:turbo");
2235 assert_eq!(level, None);
2236 }
2237
2238 #[test]
2239 fn skips_reasoning_effort_for_non_reasoning_models() {
2240 let (model, level) = OpenAiCodexProvider::resolve_model_and_reasoning_effort("gpt-4o:high");
2241 assert_eq!(model, "gpt-4o");
2242 assert_eq!(level, None);
2243 }
2244
2245 #[tokio::test]
2246 async fn lists_gpt_5_4_models() {
2247 let provider = OpenAiCodexProvider::new();
2248 let models = provider
2249 .list_models()
2250 .await
2251 .expect("model listing should succeed");
2252
2253 assert!(models.iter().any(|model| model.id == "gpt-5.4"));
2254 assert!(models.iter().any(|model| model.id == "gpt-5.4-fast"));
2255 assert!(!models.iter().any(|model| model.id == "gpt-5.4-pro"));
2256 }
2257
2258 #[test]
2259 fn rejects_pro_model_for_chatgpt_backend() {
2260 let provider = OpenAiCodexProvider::new();
2261 let err = provider
2262 .validate_model_for_backend("gpt-5.4-pro")
2263 .expect_err("chatgpt backend should reject unsupported model");
2264 assert!(
2265 err.to_string()
2266 .contains("not supported when using Codex with a ChatGPT account")
2267 );
2268 }
2269
2270 #[test]
2271 fn allows_pro_model_for_api_key_backend() {
2272 let provider = OpenAiCodexProvider::from_api_key("test-key".to_string());
2273 provider
2274 .validate_model_for_backend("gpt-5.4-pro")
2275 .expect("api key backend should allow pro model");
2276 }
2277
2278 #[test]
2279 fn allows_fast_alias_for_chatgpt_backend() {
2280 let provider = OpenAiCodexProvider::new();
2281 provider
2282 .validate_model_for_backend("gpt-5.4-fast:high")
2283 .expect("chatgpt backend should allow fast alias");
2284 }
2285
2286 #[test]
2287 fn extracts_chatgpt_account_id_from_jwt_claims() {
2288 let header = URL_SAFE_NO_PAD.encode(r#"{"alg":"none","typ":"JWT"}"#);
2289 let payload = URL_SAFE_NO_PAD
2290 .encode(r#"{"https://api.openai.com/auth":{"chatgpt_account_id":"org_test123"}}"#);
2291 let jwt = format!("{header}.{payload}.sig");
2292
2293 let account_id = OpenAiCodexProvider::extract_chatgpt_account_id(&jwt);
2294 assert_eq!(account_id.as_deref(), Some("org_test123"));
2295 }
2296
2297 #[test]
2298 fn extracts_chatgpt_account_id_from_organizations_claim() {
2299 let header = URL_SAFE_NO_PAD.encode(r#"{"alg":"none","typ":"JWT"}"#);
2300 let payload = URL_SAFE_NO_PAD.encode(r#"{"organizations":[{"id":"org_from_list"}]}"#);
2301 let jwt = format!("{header}.{payload}.sig");
2302
2303 let account_id = OpenAiCodexProvider::extract_chatgpt_account_id(&jwt);
2304 assert_eq!(account_id.as_deref(), Some("org_from_list"));
2305 }
2306
2307 #[test]
2308 fn extracts_responses_instructions_from_system_messages() {
2309 let messages = vec![
2310 Message {
2311 role: Role::System,
2312 content: vec![ContentPart::Text {
2313 text: "first system block".to_string(),
2314 }],
2315 },
2316 Message {
2317 role: Role::User,
2318 content: vec![ContentPart::Text {
2319 text: "user message".to_string(),
2320 }],
2321 },
2322 Message {
2323 role: Role::System,
2324 content: vec![ContentPart::Text {
2325 text: "second system block".to_string(),
2326 }],
2327 },
2328 ];
2329
2330 let instructions = OpenAiCodexProvider::extract_responses_instructions(&messages);
2331 assert_eq!(instructions, "first system block\n\nsecond system block");
2332 }
2333
2334 #[test]
2335 fn falls_back_to_default_responses_instructions_without_system_message() {
2336 let messages = vec![Message {
2337 role: Role::User,
2338 content: vec![ContentPart::Text {
2339 text: "only user".to_string(),
2340 }],
2341 }];
2342
2343 let instructions = OpenAiCodexProvider::extract_responses_instructions(&messages);
2344 assert_eq!(instructions, DEFAULT_RESPONSES_INSTRUCTIONS);
2345 }
2346
2347 #[test]
2348 fn responses_input_ignores_system_messages() {
2349 let messages = vec![
2350 Message {
2351 role: Role::System,
2352 content: vec![ContentPart::Text {
2353 text: "system".to_string(),
2354 }],
2355 },
2356 Message {
2357 role: Role::User,
2358 content: vec![ContentPart::Text {
2359 text: "user".to_string(),
2360 }],
2361 },
2362 ];
2363
2364 let input = OpenAiCodexProvider::convert_messages_to_responses_input(&messages);
2365 assert_eq!(input.len(), 1);
2366 assert_eq!(input[0].get("role").and_then(Value::as_str), Some("user"));
2367 }
2368
2369 #[test]
2370 fn responses_ws_request_uses_bearer_auth() {
2371 let request = OpenAiCodexProvider::build_responses_ws_request_with_base_url(
2372 "wss://example.com/v1/responses",
2373 "test-token",
2374 )
2375 .expect("request should build");
2376
2377 assert_eq!(request.uri().to_string(), "wss://example.com/v1/responses");
2378 assert_eq!(
2379 request
2380 .headers()
2381 .get("Authorization")
2382 .and_then(|v| v.to_str().ok()),
2383 Some("Bearer test-token")
2384 );
2385 assert_eq!(
2386 request
2387 .headers()
2388 .get("User-Agent")
2389 .and_then(|v| v.to_str().ok()),
2390 Some("codetether-responses-ws/1.0")
2391 );
2392 }
2393
2394 #[test]
2395 fn responses_ws_request_includes_chatgpt_account_id_when_provided() {
2396 let request = OpenAiCodexProvider::build_responses_ws_request_with_base_url_and_account_id(
2397 "wss://example.com/v1/responses",
2398 "test-token",
2399 Some("org_123"),
2400 )
2401 .expect("request should build");
2402
2403 assert_eq!(
2404 request
2405 .headers()
2406 .get("chatgpt-account-id")
2407 .and_then(|v| v.to_str().ok()),
2408 Some("org_123")
2409 );
2410 }
2411
2412 #[test]
2413 fn builds_responses_ws_create_event_for_tools() {
2414 let request = CompletionRequest {
2415 messages: vec![
2416 Message {
2417 role: Role::System,
2418 content: vec![ContentPart::Text {
2419 text: "System prompt".to_string(),
2420 }],
2421 },
2422 Message {
2423 role: Role::User,
2424 content: vec![ContentPart::Text {
2425 text: "Inspect the repo".to_string(),
2426 }],
2427 },
2428 ],
2429 tools: vec![ToolDefinition {
2430 name: "read".to_string(),
2431 description: "Read a file".to_string(),
2432 parameters: json!({
2433 "type": "object",
2434 "properties": {
2435 "path": { "type": "string" }
2436 },
2437 "required": ["path"]
2438 }),
2439 }],
2440 model: "gpt-5.4".to_string(),
2441 temperature: None,
2442 top_p: None,
2443 max_tokens: Some(8192),
2444 stop: Vec::new(),
2445 };
2446
2447 let event = OpenAiCodexProvider::build_responses_ws_create_event(
2448 &request,
2449 "gpt-5.4",
2450 Some(ThinkingLevel::High),
2451 None,
2452 );
2453
2454 assert_eq!(
2455 event.get("type").and_then(Value::as_str),
2456 Some("response.create")
2457 );
2458 assert_eq!(event.get("model").and_then(Value::as_str), Some("gpt-5.4"));
2459 assert_eq!(event.get("store").and_then(Value::as_bool), Some(false));
2460 assert_eq!(
2461 event.get("max_output_tokens").and_then(Value::as_u64),
2462 Some(8192)
2463 );
2464 assert_eq!(
2465 event.get("tools").and_then(Value::as_array).map(Vec::len),
2466 Some(1)
2467 );
2468 }
2469
2470 #[tokio::test]
2471 async fn responses_ws_connection_round_trips_json_events() {
2472 let (client_io, server_io) = duplex(16 * 1024);
2473
2474 let server = tokio::spawn(async move {
2475 let callback = |request: &ServerRequest, response: ServerResponse| {
2476 assert_eq!(request.uri().path(), "/v1/responses");
2477 assert_eq!(
2478 request
2479 .headers()
2480 .get("Authorization")
2481 .and_then(|v| v.to_str().ok()),
2482 Some("Bearer test-token")
2483 );
2484 Ok(response)
2485 };
2486
2487 let mut socket = accept_hdr_async(server_io, callback)
2488 .await
2489 .expect("server websocket handshake should succeed");
2490 let message = socket
2491 .next()
2492 .await
2493 .expect("server should receive message")
2494 .expect("server message should be valid");
2495 match message {
2496 WsMessage::Text(text) => {
2497 let event: Value =
2498 serde_json::from_str(&text).expect("server should parse JSON event");
2499 assert_eq!(
2500 event.get("type").and_then(Value::as_str),
2501 Some("response.create")
2502 );
2503 }
2504 other => panic!("expected text frame, got {other:?}"),
2505 }
2506
2507 socket
2508 .send(WsMessage::Text(
2509 json!({ "type": "response.created" }).to_string().into(),
2510 ))
2511 .await
2512 .expect("server should send response");
2513 });
2514
2515 let request = OpenAiCodexProvider::build_responses_ws_request_with_base_url(
2516 "ws://localhost/v1/responses",
2517 "test-token",
2518 )
2519 .expect("client request should build");
2520 let (stream, _) = client_async(request, client_io)
2521 .await
2522 .expect("client websocket handshake should succeed");
2523 let mut connection = OpenAiRealtimeConnection::new(stream);
2524
2525 connection
2526 .send_event(&json!({ "type": "response.create", "model": "gpt-5.4", "input": [] }))
2527 .await
2528 .expect("client should send event");
2529 let event = connection
2530 .recv_event()
2531 .await
2532 .expect("client should read event")
2533 .expect("client should receive session event");
2534 assert_eq!(
2535 event.get("type").and_then(Value::as_str),
2536 Some("response.created")
2537 );
2538 connection
2539 .close()
2540 .await
2541 .expect("client should close cleanly");
2542
2543 server.await.expect("server task should finish");
2544 }
2545
2546 #[test]
2547 fn responses_sse_parser_buffers_split_tool_call_events() {
2548 let mut parser = ResponsesSseParser::default();
2549
2550 let chunk1 = br#"data: {"type":"response.output_item.added","item":{"type":"function_call","id":"fc_1","call_id":"call_1","name":"bash"}}
2551
2552da"#;
2553 let first = OpenAiCodexProvider::parse_responses_sse_bytes(&mut parser, chunk1);
2554 assert_eq!(first.len(), 1);
2555 match &first[0] {
2556 StreamChunk::ToolCallStart { id, name } => {
2557 assert_eq!(id, "call_1");
2558 assert_eq!(name, "bash");
2559 }
2560 other => panic!("expected tool start, got {other:?}"),
2561 }
2562
2563 let chunk2 = br#"ta: {"type":"response.function_call_arguments.delta","item_id":"fc_1","delta":"{\"command\":"}
2564
2565data: {"type":"response.function_call_arguments.delta","item_id":"fc_1","delta":"\"ls\"}"}
2566
2567data: {"type":"response.output_item.done","item":{"type":"function_call","id":"fc_1","call_id":"call_1","name":"bash","arguments":"{\"command\":\"ls\"}"}}
2568
2569data: {"type":"response.completed","response":{"usage":{"input_tokens":11,"output_tokens":7,"total_tokens":18}}}
2570
2571"#;
2572 let second = OpenAiCodexProvider::parse_responses_sse_bytes(&mut parser, chunk2);
2573
2574 assert_eq!(second.len(), 4);
2575 match &second[0] {
2576 StreamChunk::ToolCallDelta {
2577 id,
2578 arguments_delta,
2579 } => {
2580 assert_eq!(id, "call_1");
2581 assert_eq!(arguments_delta, "{\"command\":");
2582 }
2583 other => panic!("expected first tool delta, got {other:?}"),
2584 }
2585 match &second[1] {
2586 StreamChunk::ToolCallDelta {
2587 id,
2588 arguments_delta,
2589 } => {
2590 assert_eq!(id, "call_1");
2591 assert_eq!(arguments_delta, "\"ls\"}");
2592 }
2593 other => panic!("expected second tool delta, got {other:?}"),
2594 }
2595 match &second[2] {
2596 StreamChunk::ToolCallEnd { id } => assert_eq!(id, "call_1"),
2597 other => panic!("expected tool end, got {other:?}"),
2598 }
2599 match &second[3] {
2600 StreamChunk::Done { usage } => {
2601 let usage = usage.as_ref().expect("expected usage");
2602 assert_eq!(usage.prompt_tokens, 11);
2603 assert_eq!(usage.completion_tokens, 7);
2604 assert_eq!(usage.total_tokens, 18);
2605 }
2606 other => panic!("expected done, got {other:?}"),
2607 }
2608 }
2609
2610 #[test]
2611 fn responses_sse_parser_falls_back_to_done_item_arguments() {
2612 let mut parser = ResponsesSseParser::default();
2613 let bytes = br#"data: {"type":"response.output_item.done","item":{"type":"function_call","id":"fc_2","call_id":"call_2","name":"read","arguments":"{\"path\":\"src/main.rs\"}"}}
2614
2615"#;
2616
2617 let chunks = OpenAiCodexProvider::parse_responses_sse_bytes(&mut parser, bytes);
2618 assert_eq!(chunks.len(), 3);
2619 match &chunks[0] {
2620 StreamChunk::ToolCallStart { id, name } => {
2621 assert_eq!(id, "call_2");
2622 assert_eq!(name, "read");
2623 }
2624 other => panic!("expected tool start, got {other:?}"),
2625 }
2626 match &chunks[1] {
2627 StreamChunk::ToolCallDelta {
2628 id,
2629 arguments_delta,
2630 } => {
2631 assert_eq!(id, "call_2");
2632 assert_eq!(arguments_delta, "{\"path\":\"src/main.rs\"}");
2633 }
2634 other => panic!("expected tool delta, got {other:?}"),
2635 }
2636 match &chunks[2] {
2637 StreamChunk::ToolCallEnd { id } => assert_eq!(id, "call_2"),
2638 other => panic!("expected tool end, got {other:?}"),
2639 }
2640 }
2641
2642 #[test]
2643 fn responses_sse_parser_flushes_final_event_without_trailing_blank_line() {
2644 let mut parser = ResponsesSseParser::default();
2645 let bytes = br#"data: {"type":"response.output_item.done","item":{"type":"function_call","id":"fc_3","call_id":"call_3","name":"read","arguments":"{\"path\":\"src/lib.rs\"}"}}"#;
2646
2647 let first = OpenAiCodexProvider::parse_responses_sse_bytes(&mut parser, bytes);
2648 assert!(first.is_empty());
2649
2650 let flushed = OpenAiCodexProvider::finish_responses_sse_parser(&mut parser);
2651 assert_eq!(flushed.len(), 3);
2652 match &flushed[0] {
2653 StreamChunk::ToolCallStart { id, name } => {
2654 assert_eq!(id, "call_3");
2655 assert_eq!(name, "read");
2656 }
2657 other => panic!("expected tool start, got {other:?}"),
2658 }
2659 match &flushed[1] {
2660 StreamChunk::ToolCallDelta {
2661 id,
2662 arguments_delta,
2663 } => {
2664 assert_eq!(id, "call_3");
2665 assert_eq!(arguments_delta, "{\"path\":\"src/lib.rs\"}");
2666 }
2667 other => panic!("expected tool delta, got {other:?}"),
2668 }
2669 match &flushed[2] {
2670 StreamChunk::ToolCallEnd { id } => assert_eq!(id, "call_3"),
2671 other => panic!("expected tool end, got {other:?}"),
2672 }
2673 }
2674
2675 #[tokio::test]
2676 async fn collect_stream_completion_updates_tool_name_after_early_delta() {
2677 let stream = stream::iter(vec![
2678 StreamChunk::ToolCallDelta {
2679 id: "call_4".to_string(),
2680 arguments_delta: "{\"path\":\"src/provider/openai_codex.rs\"}".to_string(),
2681 },
2682 StreamChunk::ToolCallStart {
2683 id: "call_4".to_string(),
2684 name: "read".to_string(),
2685 },
2686 StreamChunk::Done { usage: None },
2687 ]);
2688
2689 let response = OpenAiCodexProvider::collect_stream_completion(Box::pin(stream))
2690 .await
2691 .expect("stream completion should succeed");
2692
2693 assert!(matches!(
2694 response.message.content.first(),
2695 Some(ContentPart::ToolCall { id, name, arguments, .. })
2696 if id == "call_4"
2697 && name == "read"
2698 && arguments == "{\"path\":\"src/provider/openai_codex.rs\"}"
2699 ));
2700 }
2701}