1use super::{
9 CompletionRequest, CompletionResponse, ContentPart, FinishReason, Message, ModelInfo, Provider,
10 Role, StreamChunk, ToolDefinition, Usage,
11};
12use anyhow::{Context, Result};
13use async_trait::async_trait;
14use base64::{Engine as _, engine::general_purpose::URL_SAFE_NO_PAD};
15use futures::stream::BoxStream;
16use futures::{SinkExt, StreamExt};
17use reqwest::{Client, StatusCode};
18use serde::{Deserialize, Serialize};
19use serde_json::{Value, json};
20use sha2::{Digest, Sha256};
21use std::collections::HashMap;
22use std::io::ErrorKind;
23use std::sync::Arc;
24use tokio::io::{AsyncRead, AsyncWrite};
25use tokio::net::TcpStream;
26use tokio::sync::RwLock;
27use tokio_tungstenite::{
28 MaybeTlsStream, WebSocketStream, connect_async,
29 tungstenite::{
30 Error as WsError, Message as WsMessage,
31 client::IntoClientRequest,
32 http::{HeaderValue, Request},
33 },
34};
35
36const OPENAI_API_URL: &str = "https://api.openai.com/v1";
37const CHATGPT_CODEX_API_URL: &str = "https://chatgpt.com/backend-api/codex";
38const OPENAI_RESPONSES_WS_URL: &str = "wss://api.openai.com/v1/responses";
39const CHATGPT_CODEX_RESPONSES_WS_URL: &str = "wss://chatgpt.com/backend-api/codex/responses";
40const AUTH_ISSUER: &str = "https://auth.openai.com";
41const AUTHORIZE_URL: &str = "https://auth.openai.com/oauth/authorize";
42const TOKEN_URL: &str = "https://auth.openai.com/oauth/token";
43const CLIENT_ID: &str = "app_EMoamEEZ73f0CkXaXp7hrann";
44const REDIRECT_URI: &str = "http://localhost:1455/auth/callback";
45const SCOPE: &str = "openid profile email offline_access";
46const THINKING_LEVEL_ENV: &str = "CODETETHER_OPENAI_CODEX_THINKING_LEVEL";
47const REASONING_EFFORT_ENV: &str = "CODETETHER_OPENAI_CODEX_REASONING_EFFORT";
48const DEFAULT_RESPONSES_INSTRUCTIONS: &str = "You are a helpful assistant.";
49
50#[derive(Debug, Clone, Copy, PartialEq, Eq)]
51enum ThinkingLevel {
52 Low,
53 Medium,
54 High,
55}
56
57#[derive(Debug, Clone, Copy, PartialEq, Eq)]
58enum ResponsesWsBackend {
59 OpenAi,
60 ChatGptCodex,
61}
62
63#[derive(Debug, Clone, Copy, PartialEq, Eq)]
64enum CodexServiceTier {
65 Priority,
66}
67
68impl ThinkingLevel {
69 fn parse(raw: &str) -> Option<Self> {
70 match raw.trim().to_ascii_lowercase().as_str() {
71 "low" => Some(Self::Low),
72 "medium" => Some(Self::Medium),
73 "high" => Some(Self::High),
74 _ => None,
75 }
76 }
77
78 fn as_str(self) -> &'static str {
79 match self {
80 Self::Low => "low",
81 Self::Medium => "medium",
82 Self::High => "high",
83 }
84 }
85}
86
87impl CodexServiceTier {
88 fn as_str(self) -> &'static str {
89 match self {
90 Self::Priority => "priority",
91 }
92 }
93}
94
95#[allow(dead_code)]
97struct CachedTokens {
98 access_token: String,
99 #[allow(dead_code)]
100 refresh_token: String,
101 expires_at: std::time::Instant,
102}
103
104#[derive(Debug, Clone, Serialize, Deserialize)]
106pub struct OAuthCredentials {
107 #[serde(default)]
108 pub id_token: Option<String>,
109 #[serde(default)]
110 pub chatgpt_account_id: Option<String>,
111 pub access_token: String,
112 pub refresh_token: String,
113 pub expires_at: u64, }
115
116struct PkcePair {
118 verifier: String,
119 challenge: String,
120}
121
122#[derive(Debug, Default)]
123struct ResponsesToolState {
124 call_id: String,
125 name: Option<String>,
126 started: bool,
127 finished: bool,
128 emitted_arguments: String,
129}
130
131#[derive(Debug, Default)]
132struct ResponsesSseParser {
133 line_buffer: String,
134 event_data_lines: Vec<String>,
135 tools: HashMap<String, ResponsesToolState>,
136}
137
138pub struct OpenAiRealtimeConnection<S = MaybeTlsStream<TcpStream>> {
139 stream: WebSocketStream<S>,
140}
141
142impl<S> std::fmt::Debug for OpenAiRealtimeConnection<S> {
143 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
144 f.debug_struct("OpenAiRealtimeConnection").finish()
145 }
146}
147
148impl<S> OpenAiRealtimeConnection<S>
149where
150 S: AsyncRead + AsyncWrite + Unpin,
151{
152 fn new(stream: WebSocketStream<S>) -> Self {
153 Self { stream }
154 }
155
156 pub async fn send_event(&mut self, event: &Value) -> Result<()> {
157 let payload = serde_json::to_string(event).context("Failed to serialize Realtime event")?;
158 self.stream
159 .send(WsMessage::Text(payload.into()))
160 .await
161 .context("Failed to send Realtime event")?;
162 Ok(())
163 }
164
165 pub async fn recv_event(&mut self) -> Result<Option<Value>> {
166 while let Some(message) = self.stream.next().await {
167 let message = message.context("Realtime WebSocket receive failed")?;
168 match message {
169 WsMessage::Text(text) => {
170 let event = serde_json::from_str(&text)
171 .context("Failed to parse Realtime text event")?;
172 return Ok(Some(event));
173 }
174 WsMessage::Binary(bytes) => {
175 let text = String::from_utf8(bytes.to_vec())
176 .context("Realtime binary event was not valid UTF-8")?;
177 let event = serde_json::from_str(&text)
178 .context("Failed to parse Realtime binary event")?;
179 return Ok(Some(event));
180 }
181 WsMessage::Ping(payload) => {
182 self.stream
183 .send(WsMessage::Pong(payload))
184 .await
185 .context("Failed to respond to Realtime ping")?;
186 }
187 WsMessage::Pong(_) => {}
188 WsMessage::Frame(_) => {}
189 WsMessage::Close(_) => return Ok(None),
190 }
191 }
192
193 Ok(None)
194 }
195
196 pub async fn close(&mut self) -> Result<()> {
197 match self.stream.send(WsMessage::Close(None)).await {
198 Ok(()) => {}
199 Err(WsError::ConnectionClosed) | Err(WsError::AlreadyClosed) => {}
200 Err(WsError::Io(err))
201 if matches!(
202 err.kind(),
203 ErrorKind::BrokenPipe
204 | ErrorKind::ConnectionReset
205 | ErrorKind::ConnectionAborted
206 | ErrorKind::NotConnected
207 ) => {}
208 Err(err) => return Err(err).context("Failed to close Realtime WebSocket"),
209 }
210 Ok(())
211 }
212}
213
214pub struct OpenAiCodexProvider {
215 client: Client,
216 cached_tokens: Arc<RwLock<Option<CachedTokens>>>,
217 static_api_key: Option<String>,
218 chatgpt_account_id: Option<String>,
219 stored_credentials: Option<OAuthCredentials>,
221}
222
223impl std::fmt::Debug for OpenAiCodexProvider {
224 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
225 f.debug_struct("OpenAiCodexProvider")
226 .field("has_api_key", &self.static_api_key.is_some())
227 .field("has_chatgpt_account_id", &self.chatgpt_account_id.is_some())
228 .field("has_credentials", &self.stored_credentials.is_some())
229 .finish()
230 }
231}
232
233impl Default for OpenAiCodexProvider {
234 fn default() -> Self {
235 Self::new()
236 }
237}
238
239impl OpenAiCodexProvider {
240 fn chatgpt_supported_models() -> &'static [&'static str] {
241 &[
242 "gpt-5",
243 "gpt-5-mini",
244 "gpt-5.1-codex",
245 "gpt-5.2",
246 "gpt-5.3-codex",
247 "gpt-5.4",
248 "gpt-5.4-fast",
249 "o3",
250 "o4-mini",
251 ]
252 }
253
254 fn model_is_supported_by_backend(&self, model: &str) -> bool {
255 if !self.using_chatgpt_backend() {
256 return true;
257 }
258 Self::chatgpt_supported_models().contains(&model)
259 }
260
261 fn validate_model_for_backend(&self, model: &str) -> Result<()> {
262 let (resolved_model, _, _) =
263 Self::resolve_model_and_reasoning_effort_and_service_tier(model);
264 if self.model_is_supported_by_backend(model)
265 || self.model_is_supported_by_backend(&resolved_model)
266 {
267 return Ok(());
268 }
269
270 if self.using_chatgpt_backend() {
271 anyhow::bail!(
272 "Model '{}' is not supported when using Codex with a ChatGPT account. Supported models: {}",
273 model,
274 Self::chatgpt_supported_models().join(", ")
275 );
276 }
277
278 Ok(())
279 }
280
281 pub fn from_api_key(api_key: String) -> Self {
282 Self {
283 client: crate::provider::shared_http::shared_client().clone(),
284 cached_tokens: Arc::new(RwLock::new(None)),
285 static_api_key: Some(api_key),
286 chatgpt_account_id: None,
287 stored_credentials: None,
288 }
289 }
290
291 pub fn from_credentials(credentials: OAuthCredentials) -> Self {
293 let chatgpt_account_id = credentials
294 .chatgpt_account_id
295 .clone()
296 .or_else(|| {
297 credentials
298 .id_token
299 .as_deref()
300 .and_then(Self::extract_chatgpt_account_id_from_jwt)
301 })
302 .or_else(|| Self::extract_chatgpt_account_id_from_jwt(&credentials.access_token));
303
304 Self {
305 client: crate::provider::shared_http::shared_client().clone(),
306 cached_tokens: Arc::new(RwLock::new(None)),
307 static_api_key: None,
308 chatgpt_account_id,
309 stored_credentials: Some(credentials),
310 }
311 }
312
313 #[allow(dead_code)]
315 pub fn new() -> Self {
316 Self {
317 client: crate::provider::shared_http::shared_client().clone(),
318 cached_tokens: Arc::new(RwLock::new(None)),
319 static_api_key: None,
320 chatgpt_account_id: None,
321 stored_credentials: None,
322 }
323 }
324
325 fn generate_pkce() -> PkcePair {
327 let random_bytes: [u8; 32] = {
328 let timestamp = std::time::SystemTime::now()
329 .duration_since(std::time::UNIX_EPOCH)
330 .map(|d| d.as_nanos())
331 .unwrap_or(0);
332
333 let mut bytes = [0u8; 32];
334 let ts_bytes = timestamp.to_le_bytes();
335 let tid = std::thread::current().id();
336 let tid_repr = format!("{:?}", tid);
337 let tid_hash = Sha256::digest(tid_repr.as_bytes());
338
339 bytes[0..8].copy_from_slice(&ts_bytes[0..8]);
340 bytes[8..24].copy_from_slice(&tid_hash[0..16]);
341 bytes[24..].copy_from_slice(&Sha256::digest(ts_bytes)[0..8]);
342 bytes
343 };
344 let verifier = URL_SAFE_NO_PAD.encode(random_bytes);
345
346 let mut hasher = Sha256::new();
347 hasher.update(verifier.as_bytes());
348 let challenge_bytes = hasher.finalize();
349 let challenge = URL_SAFE_NO_PAD.encode(challenge_bytes);
350
351 PkcePair {
352 verifier,
353 challenge,
354 }
355 }
356
357 fn generate_state() -> String {
359 let timestamp = std::time::SystemTime::now()
360 .duration_since(std::time::UNIX_EPOCH)
361 .map(|d| d.as_nanos())
362 .unwrap_or(0);
363 let random: [u8; 8] = {
364 let ptr = Box::into_raw(Box::new(timestamp)) as usize;
365 let bytes = ptr.to_le_bytes();
366 let mut arr = [0u8; 8];
367 arr.copy_from_slice(&bytes);
368 arr
369 };
370 format!("{:016x}{:016x}", timestamp, u64::from_le_bytes(random))
371 }
372
373 #[allow(dead_code)]
375 pub fn get_authorization_url() -> (String, String, String) {
376 let pkce = Self::generate_pkce();
377 let state = Self::generate_state();
378
379 let url = format!(
380 "{}?response_type=code&client_id={}&redirect_uri={}&scope={}&code_challenge={}&code_challenge_method=S256&state={}&id_token_add_organizations=true&codex_cli_simplified_flow=true&originator=codex_cli_rs",
381 AUTHORIZE_URL,
382 CLIENT_ID,
383 urlencoding::encode(REDIRECT_URI),
384 urlencoding::encode(SCOPE),
385 pkce.challenge,
386 state
387 );
388
389 (url, pkce.verifier, state)
390 }
391
392 pub fn oauth_issuer() -> &'static str {
393 AUTH_ISSUER
394 }
395
396 pub fn oauth_client_id() -> &'static str {
397 CLIENT_ID
398 }
399
400 fn responses_ws_url_with_base(base_url: &str) -> String {
401 base_url.to_string()
402 }
403
404 pub fn responses_ws_url() -> String {
405 Self::responses_ws_url_with_base(OPENAI_RESPONSES_WS_URL)
406 }
407
408 fn build_responses_ws_request_with_base_url_and_account_id(
409 base_url: &str,
410 token: &str,
411 chatgpt_account_id: Option<&str>,
412 ) -> Result<Request<()>> {
413 if token.trim().is_empty() {
414 anyhow::bail!("Responses WebSocket token cannot be empty");
415 }
416
417 let url = Self::responses_ws_url_with_base(base_url);
418 let mut request = url
419 .into_client_request()
420 .context("Failed to build Responses WebSocket request")?;
421 request.headers_mut().insert(
422 "Authorization",
423 HeaderValue::from_str(&format!("Bearer {token}"))
424 .context("Failed to build Responses Authorization header")?,
425 );
426 request.headers_mut().insert(
427 "User-Agent",
428 HeaderValue::from_static("codetether-responses-ws/1.0"),
429 );
430 if let Some(account_id) = chatgpt_account_id.filter(|id| !id.trim().is_empty()) {
431 request.headers_mut().insert(
432 "ChatGPT-Account-ID",
433 HeaderValue::from_str(account_id)
434 .context("Failed to build ChatGPT account header")?,
435 );
436 }
437 Ok(request)
438 }
439
440 fn build_responses_ws_request_with_base_url(
441 base_url: &str,
442 token: &str,
443 ) -> Result<Request<()>> {
444 Self::build_responses_ws_request_with_base_url_and_account_id(base_url, token, None)
445 }
446
447 fn build_responses_ws_request_with_account_id(
448 token: &str,
449 chatgpt_account_id: Option<&str>,
450 ) -> Result<Request<()>> {
451 Self::build_responses_ws_request_with_base_url_and_account_id(
452 OPENAI_RESPONSES_WS_URL,
453 token,
454 chatgpt_account_id,
455 )
456 }
457
458 fn build_chatgpt_responses_ws_request(
459 token: &str,
460 chatgpt_account_id: Option<&str>,
461 ) -> Result<Request<()>> {
462 Self::build_responses_ws_request_with_base_url_and_account_id(
463 CHATGPT_CODEX_RESPONSES_WS_URL,
464 token,
465 chatgpt_account_id,
466 )
467 }
468
469 async fn connect_responses_ws_with_token(
470 &self,
471 token: &str,
472 chatgpt_account_id: Option<&str>,
473 backend: ResponsesWsBackend,
474 ) -> Result<OpenAiRealtimeConnection> {
475 let request = match backend {
476 ResponsesWsBackend::OpenAi => {
477 Self::build_responses_ws_request_with_account_id(token, chatgpt_account_id)?
478 }
479 ResponsesWsBackend::ChatGptCodex => {
480 Self::build_chatgpt_responses_ws_request(token, chatgpt_account_id)?
481 }
482 };
483 let (stream, _response) = connect_async(request)
484 .await
485 .context("Failed to connect to OpenAI Responses WebSocket")?;
486 Ok(OpenAiRealtimeConnection::new(stream))
487 }
488
489 pub async fn connect_responses_ws(&self) -> Result<OpenAiRealtimeConnection> {
490 let token = self.get_access_token().await?;
491 let account_id = self.resolved_chatgpt_account_id(&token);
492 let backend = if self.using_chatgpt_backend() {
493 ResponsesWsBackend::ChatGptCodex
494 } else {
495 ResponsesWsBackend::OpenAi
496 };
497 self.connect_responses_ws_with_token(&token, account_id.as_deref(), backend)
498 .await
499 }
500
501 #[allow(dead_code)]
503 pub async fn exchange_code(code: &str, verifier: &str) -> Result<OAuthCredentials> {
504 Self::exchange_code_with_redirect_uri(code, verifier, REDIRECT_URI).await
505 }
506
507 pub async fn exchange_code_with_redirect_uri(
509 code: &str,
510 verifier: &str,
511 redirect_uri: &str,
512 ) -> Result<OAuthCredentials> {
513 let client = crate::provider::shared_http::shared_client().clone();
514 let form_body = format!(
515 "grant_type={}&client_id={}&code={}&code_verifier={}&redirect_uri={}",
516 urlencoding::encode("authorization_code"),
517 CLIENT_ID,
518 urlencoding::encode(code),
519 urlencoding::encode(verifier),
520 urlencoding::encode(redirect_uri),
521 );
522
523 let response = client
524 .post(TOKEN_URL)
525 .header("Content-Type", "application/x-www-form-urlencoded")
526 .body(form_body)
527 .send()
528 .await
529 .context("Failed to exchange authorization code")?;
530
531 if !response.status().is_success() {
532 let body = response.text().await.unwrap_or_default();
533 anyhow::bail!("OAuth token exchange failed: {}", body);
534 }
535
536 #[derive(Deserialize)]
537 struct TokenResponse {
538 #[serde(default)]
539 id_token: Option<String>,
540 access_token: String,
541 refresh_token: String,
542 expires_in: u64,
543 }
544
545 let tokens: TokenResponse = response
546 .json()
547 .await
548 .context("Failed to parse token response")?;
549
550 let expires_at = std::time::SystemTime::now()
551 .duration_since(std::time::UNIX_EPOCH)
552 .context("System time error")?
553 .as_secs()
554 + tokens.expires_in;
555
556 let chatgpt_account_id = tokens
557 .id_token
558 .as_deref()
559 .and_then(Self::extract_chatgpt_account_id_from_jwt);
560
561 Ok(OAuthCredentials {
562 id_token: tokens.id_token,
563 chatgpt_account_id,
564 access_token: tokens.access_token,
565 refresh_token: tokens.refresh_token,
566 expires_at,
567 })
568 }
569
570 pub async fn exchange_id_token_for_api_key(id_token: &str) -> Result<String> {
571 #[derive(Deserialize)]
572 struct ExchangeResponse {
573 access_token: String,
574 }
575
576 let client = crate::provider::shared_http::shared_client().clone();
577 let form_body = format!(
578 "grant_type={}&client_id={}&requested_token={}&subject_token={}&subject_token_type={}",
579 urlencoding::encode("urn:ietf:params:oauth:grant-type:token-exchange"),
580 urlencoding::encode(CLIENT_ID),
581 urlencoding::encode("openai-api-key"),
582 urlencoding::encode(id_token),
583 urlencoding::encode("urn:ietf:params:oauth:token-type:id_token"),
584 );
585
586 let response = client
587 .post(TOKEN_URL)
588 .header("Content-Type", "application/x-www-form-urlencoded")
589 .body(form_body)
590 .send()
591 .await
592 .context("Failed to exchange id_token for OpenAI API key")?;
593
594 let status = response.status();
595 if !status.is_success() {
596 let body = response.text().await.unwrap_or_default();
597 anyhow::bail!("API key token exchange failed ({}): {}", status, body);
598 }
599
600 let payload: ExchangeResponse = response
601 .json()
602 .await
603 .context("Failed to parse API key token exchange response")?;
604 if payload.access_token.trim().is_empty() {
605 anyhow::bail!("API key token exchange returned an empty access token");
606 }
607 Ok(payload.access_token)
608 }
609
610 async fn refresh_access_token(&self, refresh_token: &str) -> Result<OAuthCredentials> {
612 let form_body = format!(
613 "grant_type={}&refresh_token={}&client_id={}",
614 urlencoding::encode("refresh_token"),
615 urlencoding::encode(refresh_token),
616 CLIENT_ID,
617 );
618
619 let response = self
620 .client
621 .post(TOKEN_URL)
622 .header("Content-Type", "application/x-www-form-urlencoded")
623 .body(form_body)
624 .send()
625 .await
626 .context("Failed to refresh access token")?;
627
628 if !response.status().is_success() {
629 let body = response.text().await.unwrap_or_default();
630 anyhow::bail!("Token refresh failed: {}", body);
631 }
632
633 #[derive(Deserialize)]
634 struct TokenResponse {
635 access_token: String,
636 refresh_token: String,
637 expires_in: u64,
638 }
639
640 let tokens: TokenResponse = response
641 .json()
642 .await
643 .context("Failed to parse refresh response")?;
644
645 let expires_at = std::time::SystemTime::now()
646 .duration_since(std::time::UNIX_EPOCH)
647 .context("System time error")?
648 .as_secs()
649 + tokens.expires_in;
650
651 Ok(OAuthCredentials {
652 id_token: None,
653 chatgpt_account_id: self.chatgpt_account_id.clone(),
654 access_token: tokens.access_token,
655 refresh_token: tokens.refresh_token,
656 expires_at,
657 })
658 }
659
660 async fn get_access_token(&self) -> Result<String> {
662 if let Some(ref api_key) = self.static_api_key {
663 return Ok(api_key.clone());
664 }
665
666 {
667 let cache = self.cached_tokens.read().await;
668 if let Some(ref tokens) = *cache
669 && tokens
670 .expires_at
671 .duration_since(std::time::Instant::now())
672 .as_secs()
673 > 300
674 {
675 return Ok(tokens.access_token.clone());
676 }
677 }
678
679 let mut cache = self.cached_tokens.write().await;
680
681 let creds = if let Some(ref stored) = self.stored_credentials {
682 let now = std::time::SystemTime::now()
683 .duration_since(std::time::UNIX_EPOCH)
684 .context("System time error")?
685 .as_secs();
686
687 if stored.expires_at > now + 300 {
688 stored.clone()
689 } else {
690 self.refresh_access_token(&stored.refresh_token).await?
691 }
692 } else {
693 anyhow::bail!("No OAuth credentials available. Run OAuth flow first.");
694 };
695
696 let expires_in = creds.expires_at
697 - std::time::SystemTime::now()
698 .duration_since(std::time::UNIX_EPOCH)
699 .context("System time error")?
700 .as_secs();
701
702 let cached = CachedTokens {
703 access_token: creds.access_token.clone(),
704 refresh_token: creds.refresh_token.clone(),
705 expires_at: std::time::Instant::now() + std::time::Duration::from_secs(expires_in),
706 };
707
708 let token = cached.access_token.clone();
709 *cache = Some(cached);
710 Ok(token)
711 }
712
713 #[allow(dead_code)]
714 fn convert_messages(messages: &[Message]) -> Vec<Value> {
715 messages
716 .iter()
717 .map(|msg| {
718 let role = match msg.role {
719 Role::System => "system",
720 Role::User => "user",
721 Role::Assistant => "assistant",
722 Role::Tool => "tool",
723 };
724
725 match msg.role {
726 Role::Tool => {
727 if let Some(ContentPart::ToolResult {
728 tool_call_id,
729 content,
730 }) = msg.content.first()
731 {
732 json!({
733 "role": "tool",
734 "tool_call_id": tool_call_id,
735 "content": content
736 })
737 } else {
738 json!({ "role": role, "content": "" })
739 }
740 }
741 Role::Assistant => {
742 let text: String = msg
743 .content
744 .iter()
745 .filter_map(|p| match p {
746 ContentPart::Text { text } => Some(text.clone()),
747 _ => None,
748 })
749 .collect::<Vec<_>>()
750 .join("");
751
752 let tool_calls: Vec<Value> = msg
753 .content
754 .iter()
755 .filter_map(|p| match p {
756 ContentPart::ToolCall {
757 id,
758 name,
759 arguments,
760 ..
761 } => Some(json!({
762 "id": id,
763 "type": "function",
764 "function": {
765 "name": name,
766 "arguments": arguments
767 }
768 })),
769 _ => None,
770 })
771 .collect();
772
773 if tool_calls.is_empty() {
774 json!({ "role": "assistant", "content": text })
775 } else {
776 json!({
777 "role": "assistant",
778 "content": if text.is_empty() { Value::Null } else { json!(text) },
779 "tool_calls": tool_calls
780 })
781 }
782 }
783 _ => {
784 let text: String = msg
785 .content
786 .iter()
787 .filter_map(|p| match p {
788 ContentPart::Text { text } => Some(text.clone()),
789 _ => None,
790 })
791 .collect::<Vec<_>>()
792 .join("\n");
793 json!({ "role": role, "content": text })
794 }
795 }
796 })
797 .collect()
798 }
799
800 #[allow(dead_code)]
801 fn convert_tools(tools: &[ToolDefinition]) -> Vec<Value> {
802 tools
803 .iter()
804 .map(|t| {
805 json!({
806 "type": "function",
807 "function": {
808 "name": t.name,
809 "description": t.description,
810 "parameters": t.parameters
811 }
812 })
813 })
814 .collect()
815 }
816
817 fn convert_responses_tools(tools: &[ToolDefinition]) -> Vec<Value> {
818 tools
819 .iter()
820 .map(|t| {
821 json!({
822 "type": "function",
823 "name": t.name,
824 "description": t.description,
825 "strict": false,
826 "parameters": t.parameters
827 })
828 })
829 .collect()
830 }
831
832 fn extract_responses_instructions(messages: &[Message]) -> String {
833 let instructions = messages
834 .iter()
835 .filter(|msg| matches!(msg.role, Role::System))
836 .filter_map(|msg| {
837 let text = msg
838 .content
839 .iter()
840 .filter_map(|p| match p {
841 ContentPart::Text { text } => Some(text.clone()),
842 _ => None,
843 })
844 .collect::<Vec<_>>()
845 .join("\n");
846 (!text.is_empty()).then_some(text)
847 })
848 .collect::<Vec<_>>()
849 .join("\n\n");
850
851 if instructions.is_empty() {
852 DEFAULT_RESPONSES_INSTRUCTIONS.to_string()
853 } else {
854 instructions
855 }
856 }
857
858 fn convert_messages_to_responses_input(messages: &[Message]) -> Vec<Value> {
859 let mut input = Vec::new();
860 let mut known_tool_call_ids = std::collections::HashSet::new();
861
862 for msg in messages {
863 match msg.role {
864 Role::System => {}
865 Role::User => {
866 let text: String = msg
867 .content
868 .iter()
869 .filter_map(|p| match p {
870 ContentPart::Text { text } => Some(text.clone()),
871 _ => None,
872 })
873 .collect::<Vec<_>>()
874 .join("\n");
875 if !text.is_empty() {
876 input.push(json!({
877 "type": "message",
878 "role": "user",
879 "content": [{
880 "type": "input_text",
881 "text": text
882 }]
883 }));
884 }
885 }
886 Role::Assistant => {
887 let text: String = msg
888 .content
889 .iter()
890 .filter_map(|p| match p {
891 ContentPart::Text { text } => Some(text.clone()),
892 _ => None,
893 })
894 .collect::<Vec<_>>()
895 .join("");
896
897 if !text.is_empty() {
898 input.push(json!({
899 "type": "message",
900 "role": "assistant",
901 "content": [{
902 "type": "output_text",
903 "text": text
904 }]
905 }));
906 }
907
908 for part in &msg.content {
909 if let ContentPart::ToolCall {
910 id,
911 name,
912 arguments,
913 ..
914 } = part
915 {
916 known_tool_call_ids.insert(id.clone());
917 input.push(json!({
918 "type": "function_call",
919 "call_id": id,
920 "name": name,
921 "arguments": arguments,
922 }));
923 }
924 }
925 }
926 Role::Tool => {
927 for part in &msg.content {
928 if let ContentPart::ToolResult {
929 tool_call_id,
930 content,
931 } = part
932 {
933 if known_tool_call_ids.contains(tool_call_id) {
934 input.push(json!({
935 "type": "function_call_output",
936 "call_id": tool_call_id,
937 "output": content,
938 }));
939 } else {
940 tracing::warn!(
941 tool_call_id = %tool_call_id,
942 "Skipping orphaned function_call_output while building Codex responses input"
943 );
944 }
945 }
946 }
947 }
948 }
949 }
950
951 input
952 }
953
954 fn using_chatgpt_backend(&self) -> bool {
955 self.static_api_key.is_none()
956 }
957
958 fn extract_chatgpt_account_id_from_jwt(jwt: &str) -> Option<String> {
959 let payload_b64 = jwt.split('.').nth(1)?;
960 let payload = URL_SAFE_NO_PAD.decode(payload_b64).ok()?;
961 let value: Value = serde_json::from_slice(&payload).ok()?;
962
963 value
964 .get("https://api.openai.com/auth")
965 .and_then(Value::as_object)
966 .and_then(|auth| auth.get("chatgpt_account_id"))
967 .and_then(Value::as_str)
968 .map(|s| s.to_string())
969 .or_else(|| {
970 value
971 .get("chatgpt_account_id")
972 .and_then(Value::as_str)
973 .map(|s| s.to_string())
974 })
975 .or_else(|| {
976 value
977 .get("organizations")
978 .and_then(Value::as_array)
979 .and_then(|orgs| orgs.first())
980 .and_then(|org| org.get("id"))
981 .and_then(Value::as_str)
982 .map(|s| s.to_string())
983 })
984 }
985
986 pub fn extract_chatgpt_account_id(token: &str) -> Option<String> {
987 Self::extract_chatgpt_account_id_from_jwt(token)
988 }
989
990 fn resolved_chatgpt_account_id(&self, access_token: &str) -> Option<String> {
991 self.chatgpt_account_id
992 .clone()
993 .or_else(|| Self::extract_chatgpt_account_id_from_jwt(access_token))
994 .or_else(|| {
995 self.stored_credentials.as_ref().and_then(|creds| {
996 creds
997 .id_token
998 .as_deref()
999 .and_then(Self::extract_chatgpt_account_id_from_jwt)
1000 .or_else(|| creds.chatgpt_account_id.clone())
1001 })
1002 })
1003 }
1004
1005 fn parse_responses_usage(usage: Option<&Value>) -> Usage {
1006 let Some(usage) = usage else {
1007 return Usage::default();
1008 };
1009
1010 let prompt_tokens = usage
1011 .get("prompt_tokens")
1012 .or_else(|| usage.get("input_tokens"))
1013 .and_then(Value::as_u64)
1014 .unwrap_or(0) as usize;
1015
1016 let completion_tokens = usage
1017 .get("completion_tokens")
1018 .or_else(|| usage.get("output_tokens"))
1019 .and_then(Value::as_u64)
1020 .unwrap_or(0) as usize;
1021
1022 let total_tokens = usage
1023 .get("total_tokens")
1024 .and_then(Value::as_u64)
1025 .unwrap_or((prompt_tokens + completion_tokens) as u64)
1026 as usize;
1027
1028 Usage {
1029 prompt_tokens,
1030 completion_tokens,
1031 total_tokens,
1032 cache_read_tokens: None,
1033 cache_write_tokens: None,
1034 }
1035 }
1036
1037 fn extract_json_string(value: Option<&Value>) -> Option<String> {
1038 match value {
1039 Some(Value::String(text)) => Some(text.clone()),
1040 Some(other) => serde_json::to_string(other).ok(),
1041 None => None,
1042 }
1043 }
1044
1045 #[allow(dead_code)]
1046 fn parse_responses_output_parts(output: &[Value]) -> (Vec<ContentPart>, bool) {
1047 let mut parts = Vec::new();
1048 let mut has_tool_calls = false;
1049
1050 for item in output {
1051 match item.get("type").and_then(Value::as_str) {
1052 Some("message") => {
1053 if let Some(content) = item.get("content").and_then(Value::as_array) {
1054 let text = content
1055 .iter()
1056 .filter(|segment| {
1057 matches!(
1058 segment.get("type").and_then(Value::as_str),
1059 Some("output_text") | Some("text")
1060 )
1061 })
1062 .filter_map(|segment| {
1063 segment
1064 .get("text")
1065 .and_then(Value::as_str)
1066 .map(str::to_string)
1067 })
1068 .collect::<Vec<_>>()
1069 .join("");
1070 if !text.is_empty() {
1071 parts.push(ContentPart::Text { text });
1072 }
1073 }
1074 }
1075 Some("function_call") => {
1076 let call_id = item
1077 .get("call_id")
1078 .or_else(|| item.get("id"))
1079 .and_then(Value::as_str);
1080 let name = item.get("name").and_then(Value::as_str);
1081 let arguments = item.get("arguments").and_then(Value::as_str);
1082 if let (Some(call_id), Some(name), Some(arguments)) = (call_id, name, arguments)
1083 {
1084 has_tool_calls = true;
1085 parts.push(ContentPart::ToolCall {
1086 id: call_id.to_string(),
1087 name: name.to_string(),
1088 arguments: arguments.to_string(),
1089 thought_signature: None,
1090 });
1091 }
1092 }
1093 _ => {}
1094 }
1095 }
1096
1097 (parts, has_tool_calls)
1098 }
1099
1100 fn parse_model_thinking_level(model: &str) -> (String, Option<ThinkingLevel>) {
1101 let Some((base_model, level_str)) = model.rsplit_once(':') else {
1102 return (model.to_string(), None);
1103 };
1104 let Some(level) = ThinkingLevel::parse(level_str) else {
1105 return (model.to_string(), None);
1106 };
1107 if base_model.trim().is_empty() {
1108 return (model.to_string(), None);
1109 }
1110 (base_model.to_string(), Some(level))
1111 }
1112
1113 fn env_thinking_level() -> Option<ThinkingLevel> {
1114 std::env::var(THINKING_LEVEL_ENV)
1115 .ok()
1116 .or_else(|| std::env::var(REASONING_EFFORT_ENV).ok())
1117 .as_deref()
1118 .and_then(ThinkingLevel::parse)
1119 }
1120
1121 fn model_supports_reasoning_effort(model: &str) -> bool {
1122 let normalized = model.to_ascii_lowercase();
1123 normalized.starts_with("gpt-5")
1124 || normalized.starts_with("o1")
1125 || normalized.starts_with("o3")
1126 || normalized.starts_with("o4")
1127 }
1128
1129 fn parse_service_tier_model_alias(model: &str) -> (String, Option<CodexServiceTier>) {
1130 match model {
1131 "gpt-5.4-fast" => ("gpt-5.4".to_string(), Some(CodexServiceTier::Priority)),
1133 _ => (model.to_string(), None),
1134 }
1135 }
1136
1137 fn resolve_model_and_reasoning_effort(model: &str) -> (String, Option<ThinkingLevel>) {
1138 let (base_model, level, _) =
1139 Self::resolve_model_and_reasoning_effort_and_service_tier(model);
1140 (base_model, level)
1141 }
1142
1143 fn resolve_model_and_reasoning_effort_and_service_tier(
1144 model: &str,
1145 ) -> (String, Option<ThinkingLevel>, Option<CodexServiceTier>) {
1146 let (base_model, level_from_model) = Self::parse_model_thinking_level(model);
1147 let level = level_from_model.or_else(Self::env_thinking_level);
1148 let (base_model, service_tier) = Self::parse_service_tier_model_alias(&base_model);
1149 if !Self::model_supports_reasoning_effort(&base_model) {
1150 return (base_model, None, service_tier);
1151 }
1152 (base_model, level, service_tier)
1153 }
1154
1155 fn apply_service_tier(payload: &mut Value, service_tier: Option<CodexServiceTier>) {
1156 if let Some(service_tier) = service_tier {
1157 payload["service_tier"] = json!(service_tier.as_str());
1158 }
1159 }
1160
1161 fn format_openai_api_error(status: StatusCode, body: &str, model: &str) -> String {
1162 if status == StatusCode::UNAUTHORIZED && body.contains("Missing scopes: model.request") {
1163 return format!(
1164 "OpenAI Codex OAuth token is missing required scope `model.request` for model `{model}`.\n\
1165 Re-run `codetether auth codex` and complete OAuth approval with a ChatGPT subscription account \
1166 that has model access in your org/project."
1167 );
1168 }
1169 if status == StatusCode::UNAUTHORIZED
1170 && (body.contains("chatgpt-account-id")
1171 || body.contains("ChatGPT-Account-ID")
1172 || body.contains("workspace"))
1173 {
1174 return "OpenAI Codex auth is missing a ChatGPT workspace/account identifier.\n\
1175 Re-run `codetether auth codex --device-code` and sign in to the intended workspace."
1176 .to_string();
1177 }
1178 format!("OpenAI API error ({status}): {body}")
1179 }
1180
1181 fn build_responses_ws_create_event(
1182 request: &CompletionRequest,
1183 model: &str,
1184 reasoning_effort: Option<ThinkingLevel>,
1185 service_tier: Option<CodexServiceTier>,
1186 ) -> Value {
1187 Self::build_responses_ws_create_event_for_backend(
1188 request,
1189 model,
1190 reasoning_effort,
1191 service_tier,
1192 ResponsesWsBackend::OpenAi,
1193 )
1194 }
1195
1196 fn build_responses_ws_create_event_for_backend(
1197 request: &CompletionRequest,
1198 model: &str,
1199 reasoning_effort: Option<ThinkingLevel>,
1200 service_tier: Option<CodexServiceTier>,
1201 backend: ResponsesWsBackend,
1202 ) -> Value {
1203 let instructions = Self::extract_responses_instructions(&request.messages);
1204 let input = Self::convert_messages_to_responses_input(&request.messages);
1205 let tools = Self::convert_responses_tools(&request.tools);
1206
1207 let mut event = json!({
1208 "type": "response.create",
1209 "model": model,
1210 "store": false,
1211 "instructions": instructions,
1212 "input": input,
1213 });
1214
1215 if !tools.is_empty() {
1216 event["tools"] = json!(tools);
1217 }
1218 if let Some(level) = reasoning_effort {
1219 event["reasoning"] = json!({ "effort": level.as_str() });
1220 }
1221 Self::apply_service_tier(&mut event, service_tier);
1222 if backend == ResponsesWsBackend::OpenAi {
1223 event["tool_choice"] = json!("auto");
1224 event["parallel_tool_calls"] = json!(true);
1225 }
1226 if backend == ResponsesWsBackend::OpenAi
1227 && let Some(max_tokens) = request.max_tokens
1228 {
1229 event["max_output_tokens"] = json!(max_tokens);
1230 }
1231
1232 event
1233 }
1234
1235 fn finish_responses_tool_call(
1236 parser: &mut ResponsesSseParser,
1237 key: &str,
1238 chunks: &mut Vec<StreamChunk>,
1239 ) {
1240 if let Some(state) = parser.tools.get_mut(key)
1241 && !state.finished
1242 {
1243 chunks.push(StreamChunk::ToolCallEnd {
1244 id: state.call_id.clone(),
1245 });
1246 state.finished = true;
1247 }
1248 }
1249
1250 fn parse_responses_event(
1251 parser: &mut ResponsesSseParser,
1252 event: &Value,
1253 chunks: &mut Vec<StreamChunk>,
1254 ) {
1255 match event.get("type").and_then(Value::as_str) {
1256 Some("response.output_text.delta") => {
1257 if let Some(delta) = event.get("delta").and_then(Value::as_str) {
1258 chunks.push(StreamChunk::Text(delta.to_string()));
1259 }
1260 }
1261 Some("response.output_item.added") => {
1262 if let Some(item) = event.get("item") {
1263 Self::record_responses_tool_item(parser, item, chunks, false);
1264 }
1265 }
1266 Some("response.function_call_arguments.delta") => {
1267 Self::record_responses_tool_arguments(parser, &event, chunks, false);
1268 }
1269 Some("response.function_call_arguments.done") => {
1270 Self::record_responses_tool_arguments(parser, &event, chunks, true);
1271 }
1272 Some("response.output_item.done") => {
1273 if let Some(item) = event.get("item")
1274 && item.get("type").and_then(Value::as_str) == Some("function_call")
1275 && let Some(key) = item
1276 .get("id")
1277 .and_then(Value::as_str)
1278 .or_else(|| item.get("call_id").and_then(Value::as_str))
1279 {
1280 Self::record_responses_tool_item(parser, item, chunks, true);
1281 Self::finish_responses_tool_call(parser, key, chunks);
1282 }
1283 }
1284 Some("response.completed") | Some("response.done") => {
1285 if let Some(output) = event
1286 .get("response")
1287 .and_then(|response| response.get("output"))
1288 .and_then(Value::as_array)
1289 {
1290 for item in output {
1291 if item.get("type").and_then(Value::as_str) == Some("function_call")
1292 && let Some(key) = item
1293 .get("id")
1294 .and_then(Value::as_str)
1295 .or_else(|| item.get("call_id").and_then(Value::as_str))
1296 {
1297 Self::record_responses_tool_item(parser, item, chunks, true);
1298 Self::finish_responses_tool_call(parser, key, chunks);
1299 }
1300 }
1301 }
1302 let failed_message = event
1303 .get("response")
1304 .and_then(|response| response.get("status"))
1305 .and_then(Value::as_str)
1306 .filter(|status| matches!(*status, "failed" | "cancelled" | "incomplete"))
1307 .map(|_| {
1308 event
1309 .get("response")
1310 .and_then(|response| response.get("error"))
1311 .and_then(|error| error.get("message"))
1312 .and_then(Value::as_str)
1313 .unwrap_or("Response failed")
1314 .to_string()
1315 });
1316 if let Some(message) = failed_message {
1317 chunks.push(StreamChunk::Error(message));
1318 return;
1319 }
1320 let usage = event
1321 .get("response")
1322 .and_then(|response| response.get("usage"))
1323 .map(|usage| Self::parse_responses_usage(Some(usage)));
1324 chunks.push(StreamChunk::Done { usage });
1325 }
1326 Some("response.failed") => {
1327 let message = event
1328 .get("response")
1329 .and_then(|response| response.get("error"))
1330 .and_then(|error| error.get("message"))
1331 .or_else(|| event.get("error").and_then(|error| error.get("message")))
1332 .and_then(Value::as_str)
1333 .unwrap_or("Response failed")
1334 .to_string();
1335 chunks.push(StreamChunk::Error(message));
1336 }
1337 Some("error") => {
1338 let message = event
1339 .get("error")
1340 .and_then(|error| error.get("message"))
1341 .or_else(|| event.get("message"))
1342 .and_then(Value::as_str)
1343 .unwrap_or("Realtime error")
1344 .to_string();
1345 chunks.push(StreamChunk::Error(message));
1346 }
1347 _ => {}
1348 }
1349 }
1350
1351 fn parse_responses_sse_event(
1352 parser: &mut ResponsesSseParser,
1353 data: &str,
1354 chunks: &mut Vec<StreamChunk>,
1355 ) {
1356 if data == "[DONE]" {
1357 chunks.push(StreamChunk::Done { usage: None });
1358 return;
1359 }
1360
1361 let Ok(event): Result<Value, _> = serde_json::from_str(data) else {
1362 return;
1363 };
1364
1365 Self::parse_responses_event(parser, &event, chunks);
1366 }
1367
1368 fn record_responses_tool_item(
1369 parser: &mut ResponsesSseParser,
1370 item: &Value,
1371 chunks: &mut Vec<StreamChunk>,
1372 include_arguments: bool,
1373 ) -> Option<String> {
1374 if item.get("type").and_then(Value::as_str) != Some("function_call") {
1375 return None;
1376 }
1377
1378 let key = item
1379 .get("id")
1380 .and_then(Value::as_str)
1381 .or_else(|| item.get("call_id").and_then(Value::as_str))?;
1382 let call_id = item
1383 .get("call_id")
1384 .and_then(Value::as_str)
1385 .or_else(|| item.get("id").and_then(Value::as_str))?
1386 .to_string();
1387 let name = item.get("name").and_then(Value::as_str).map(str::to_string);
1388 let arguments = if include_arguments {
1389 Self::extract_json_string(item.get("arguments"))
1390 } else {
1391 None
1392 };
1393
1394 let state = parser
1395 .tools
1396 .entry(key.to_string())
1397 .or_insert_with(|| ResponsesToolState {
1398 call_id: call_id.clone(),
1399 ..ResponsesToolState::default()
1400 });
1401 if state.call_id.is_empty() {
1402 state.call_id = call_id.clone();
1403 }
1404 if state.name.is_none() {
1405 state.name = name;
1406 }
1407 if !state.started
1408 && let Some(name) = state.name.clone()
1409 {
1410 chunks.push(StreamChunk::ToolCallStart {
1411 id: state.call_id.clone(),
1412 name,
1413 });
1414 state.started = true;
1415 }
1416 if let Some(arguments) = arguments {
1417 Self::emit_missing_responses_tool_arguments(state, arguments, chunks);
1418 }
1419
1420 Some(state.call_id.clone())
1421 }
1422
1423 fn record_responses_tool_arguments(
1424 parser: &mut ResponsesSseParser,
1425 event: &Value,
1426 chunks: &mut Vec<StreamChunk>,
1427 use_final_arguments: bool,
1428 ) {
1429 let key = event
1430 .get("item_id")
1431 .and_then(Value::as_str)
1432 .or_else(|| event.get("call_id").and_then(Value::as_str));
1433 let Some(key) = key else {
1434 return;
1435 };
1436
1437 let fallback_call_id = event
1438 .get("call_id")
1439 .and_then(Value::as_str)
1440 .unwrap_or(key)
1441 .to_string();
1442 let state = parser
1443 .tools
1444 .entry(key.to_string())
1445 .or_insert_with(|| ResponsesToolState {
1446 call_id: fallback_call_id.clone(),
1447 ..ResponsesToolState::default()
1448 });
1449 if state.call_id.is_empty() {
1450 state.call_id = fallback_call_id;
1451 }
1452
1453 if !state.started
1454 && let Some(name) = event.get("name").and_then(Value::as_str)
1455 {
1456 state.name = Some(name.to_string());
1457 chunks.push(StreamChunk::ToolCallStart {
1458 id: state.call_id.clone(),
1459 name: name.to_string(),
1460 });
1461 state.started = true;
1462 }
1463
1464 let arguments = if use_final_arguments {
1465 Self::extract_json_string(event.get("arguments")).or_else(|| {
1466 event
1467 .get("delta")
1468 .and_then(Value::as_str)
1469 .map(str::to_string)
1470 })
1471 } else {
1472 event
1473 .get("delta")
1474 .and_then(Value::as_str)
1475 .map(str::to_string)
1476 };
1477
1478 if let Some(arguments) = arguments {
1479 Self::emit_missing_responses_tool_arguments(state, arguments, chunks);
1480 }
1481 }
1482
1483 fn emit_missing_responses_tool_arguments(
1484 state: &mut ResponsesToolState,
1485 arguments: String,
1486 chunks: &mut Vec<StreamChunk>,
1487 ) {
1488 let delta = if arguments.starts_with(&state.emitted_arguments) {
1489 arguments[state.emitted_arguments.len()..].to_string()
1490 } else if state.emitted_arguments.is_empty() {
1491 arguments.clone()
1492 } else if arguments == state.emitted_arguments {
1493 String::new()
1494 } else {
1495 arguments.clone()
1496 };
1497
1498 if !delta.is_empty() {
1499 chunks.push(StreamChunk::ToolCallDelta {
1500 id: state.call_id.clone(),
1501 arguments_delta: delta.clone(),
1502 });
1503 state.emitted_arguments.push_str(&delta);
1504 }
1505 }
1506
1507 fn parse_responses_sse_bytes(
1508 parser: &mut ResponsesSseParser,
1509 bytes: &[u8],
1510 ) -> Vec<StreamChunk> {
1511 parser.line_buffer.push_str(&String::from_utf8_lossy(bytes));
1512 let mut chunks = Vec::new();
1513
1514 while let Some(line_end) = parser.line_buffer.find('\n') {
1515 let mut line = parser.line_buffer[..line_end].to_string();
1516 parser.line_buffer.drain(..=line_end);
1517
1518 if line.ends_with('\r') {
1519 line.pop();
1520 }
1521
1522 if line.is_empty() {
1523 if !parser.event_data_lines.is_empty() {
1524 let data = parser.event_data_lines.join("\n");
1525 parser.event_data_lines.clear();
1526 Self::parse_responses_sse_event(parser, &data, &mut chunks);
1527 }
1528 continue;
1529 }
1530
1531 if let Some(data) = line.strip_prefix("data:") {
1532 let data = data.strip_prefix(' ').unwrap_or(data);
1533 parser.event_data_lines.push(data.to_string());
1534 }
1535 }
1536
1537 chunks
1538 }
1539
1540 fn finish_responses_sse_parser(parser: &mut ResponsesSseParser) -> Vec<StreamChunk> {
1541 let mut chunks = Vec::new();
1542
1543 if !parser.line_buffer.is_empty() {
1544 let mut line = std::mem::take(&mut parser.line_buffer);
1545 if line.ends_with('\r') {
1546 line.pop();
1547 }
1548 if let Some(data) = line.strip_prefix("data:") {
1549 let data = data.strip_prefix(' ').unwrap_or(data);
1550 parser.event_data_lines.push(data.to_string());
1551 }
1552 }
1553
1554 if !parser.event_data_lines.is_empty() {
1555 let data = parser.event_data_lines.join("\n");
1556 parser.event_data_lines.clear();
1557 Self::parse_responses_sse_event(parser, &data, &mut chunks);
1558 }
1559
1560 chunks
1561 }
1562
1563 async fn complete_with_chatgpt_responses(
1564 &self,
1565 request: CompletionRequest,
1566 access_token: String,
1567 ) -> Result<CompletionResponse> {
1568 let stream = self
1569 .complete_stream_with_chatgpt_responses(request, access_token)
1570 .await?;
1571 Self::collect_stream_completion(stream).await
1572 }
1573
1574 async fn complete_with_openai_responses(
1575 &self,
1576 request: CompletionRequest,
1577 api_key: String,
1578 ) -> Result<CompletionResponse> {
1579 let stream = self
1580 .complete_stream_with_openai_responses(request, api_key)
1581 .await?;
1582 Self::collect_stream_completion(stream).await
1583 }
1584
1585 async fn collect_stream_completion(
1586 mut stream: BoxStream<'static, StreamChunk>,
1587 ) -> Result<CompletionResponse> {
1588 #[derive(Default)]
1589 struct ToolAccumulator {
1590 id: String,
1591 name: String,
1592 arguments: String,
1593 }
1594
1595 let mut text = String::new();
1596 let mut tools = Vec::<ToolAccumulator>::new();
1597 let mut tool_index_by_id = HashMap::<String, usize>::new();
1598 let mut usage = Usage::default();
1599
1600 while let Some(chunk) = stream.next().await {
1601 match chunk {
1602 StreamChunk::Text(delta) => text.push_str(&delta),
1603 StreamChunk::ToolCallStart { id, name } => {
1604 let next_idx = tools.len();
1605 let idx = *tool_index_by_id.entry(id.clone()).or_insert(next_idx);
1606 if idx == next_idx {
1607 tools.push(ToolAccumulator {
1608 id,
1609 name,
1610 arguments: String::new(),
1611 });
1612 } else if tools[idx].name == "tool" {
1613 tools[idx].name = name;
1614 }
1615 }
1616 StreamChunk::ToolCallDelta {
1617 id,
1618 arguments_delta,
1619 } => {
1620 if let Some(idx) = tool_index_by_id.get(&id).copied() {
1621 tools[idx].arguments.push_str(&arguments_delta);
1622 } else {
1623 let idx = tools.len();
1624 tool_index_by_id.insert(id.clone(), idx);
1625 tools.push(ToolAccumulator {
1626 id,
1627 name: "tool".to_string(),
1628 arguments: arguments_delta,
1629 });
1630 }
1631 }
1632 StreamChunk::ToolCallEnd { .. } => {}
1633 StreamChunk::Done { usage: done_usage } => {
1634 if let Some(done_usage) = done_usage {
1635 usage = done_usage;
1636 }
1637 }
1638 StreamChunk::Error(message) => anyhow::bail!(message),
1639 }
1640 }
1641
1642 let mut content = Vec::new();
1643 if !text.is_empty() {
1644 content.push(ContentPart::Text { text });
1645 }
1646 for tool in tools {
1647 content.push(ContentPart::ToolCall {
1648 id: tool.id,
1649 name: tool.name,
1650 arguments: tool.arguments,
1651 thought_signature: None,
1652 });
1653 }
1654
1655 let finish_reason = if content
1656 .iter()
1657 .any(|part| matches!(part, ContentPart::ToolCall { .. }))
1658 {
1659 FinishReason::ToolCalls
1660 } else {
1661 FinishReason::Stop
1662 };
1663
1664 Ok(CompletionResponse {
1665 message: Message {
1666 role: Role::Assistant,
1667 content,
1668 },
1669 usage,
1670 finish_reason,
1671 })
1672 }
1673
1674 async fn complete_stream_with_chatgpt_responses(
1675 &self,
1676 request: CompletionRequest,
1677 access_token: String,
1678 ) -> Result<BoxStream<'static, StreamChunk>> {
1679 let account_id = self.resolved_chatgpt_account_id(&access_token).context(
1680 "OpenAI Codex OAuth token is missing ChatGPT workspace/account ID. Re-run `codetether auth codex --device-code`.",
1681 )?;
1682 match self
1683 .complete_stream_with_realtime(
1684 request.clone(),
1685 access_token.clone(),
1686 Some(account_id.clone()),
1687 "chatgpt-codex-responses-ws",
1688 ResponsesWsBackend::ChatGptCodex,
1689 )
1690 .await
1691 {
1692 Ok(stream) => Ok(stream),
1693 Err(error) => {
1694 tracing::warn!(
1695 error = %error,
1696 "Responses WebSocket connect failed for ChatGPT Codex backend; falling back to HTTP responses streaming"
1697 );
1698 self.complete_stream_with_chatgpt_http_responses(request, access_token, account_id)
1699 .await
1700 }
1701 }
1702 }
1703
1704 async fn complete_stream_with_openai_responses(
1705 &self,
1706 request: CompletionRequest,
1707 api_key: String,
1708 ) -> Result<BoxStream<'static, StreamChunk>> {
1709 match self
1710 .complete_stream_with_realtime(
1711 request.clone(),
1712 api_key.clone(),
1713 None,
1714 "openai-responses-ws",
1715 ResponsesWsBackend::OpenAi,
1716 )
1717 .await
1718 {
1719 Ok(stream) => Ok(stream),
1720 Err(error) => {
1721 tracing::warn!(
1722 error = %error,
1723 "Responses WebSocket connect failed for OpenAI backend; falling back to HTTP responses streaming"
1724 );
1725 self.complete_stream_with_openai_http_responses(request, api_key)
1726 .await
1727 }
1728 }
1729 }
1730
1731 async fn complete_stream_with_chatgpt_http_responses(
1732 &self,
1733 request: CompletionRequest,
1734 access_token: String,
1735 account_id: String,
1736 ) -> Result<BoxStream<'static, StreamChunk>> {
1737 let (model, reasoning_effort, service_tier) =
1738 Self::resolve_model_and_reasoning_effort_and_service_tier(&request.model);
1739 let instructions = Self::extract_responses_instructions(&request.messages);
1740 let input = Self::convert_messages_to_responses_input(&request.messages);
1741 let tools = Self::convert_responses_tools(&request.tools);
1742
1743 let mut body = json!({
1744 "model": model,
1745 "instructions": instructions,
1746 "input": input,
1747 "stream": true,
1748 "store": false,
1749 "tool_choice": "auto",
1750 "parallel_tool_calls": true,
1751 });
1752
1753 if !tools.is_empty() {
1754 body["tools"] = json!(tools);
1755 }
1756 if let Some(level) = reasoning_effort {
1757 body["reasoning"] = json!({ "effort": level.as_str() });
1758 }
1759 Self::apply_service_tier(&mut body, service_tier);
1760
1761 tracing::info!(
1762 backend = "chatgpt-codex-responses-http",
1763 instructions_len = body
1764 .get("instructions")
1765 .and_then(|v| v.as_str())
1766 .map(str::len)
1767 .unwrap_or(0),
1768 input_items = body
1769 .get("input")
1770 .and_then(|v| v.as_array())
1771 .map(Vec::len)
1772 .unwrap_or(0),
1773 has_tools = !tools.is_empty(),
1774 "Sending HTTP responses request"
1775 );
1776
1777 let response = self
1778 .client
1779 .post(format!("{}/responses", CHATGPT_CODEX_API_URL))
1780 .header("Authorization", format!("Bearer {}", access_token))
1781 .header("chatgpt-account-id", account_id)
1782 .header("Content-Type", "application/json")
1783 .json(&body)
1784 .send()
1785 .await
1786 .context("Failed to send streaming request to ChatGPT Codex backend")?;
1787
1788 let status = response.status();
1789 if !status.is_success() {
1790 let body = response.text().await.unwrap_or_default();
1791 anyhow::bail!(Self::format_openai_api_error(status, &body, &request.model));
1792 }
1793
1794 let stream = async_stream::stream! {
1795 let mut parser = ResponsesSseParser::default();
1796 let mut byte_stream = response.bytes_stream();
1797
1798 while let Some(result) = byte_stream.next().await {
1799 match result {
1800 Ok(bytes) => {
1801 for chunk in Self::parse_responses_sse_bytes(&mut parser, &bytes) {
1802 yield chunk;
1803 }
1804 }
1805 Err(error) => yield StreamChunk::Error(error.to_string()),
1806 }
1807 }
1808
1809 for chunk in Self::finish_responses_sse_parser(&mut parser) {
1810 yield chunk;
1811 }
1812 };
1813
1814 Ok(Box::pin(stream))
1815 }
1816
1817 async fn complete_stream_with_openai_http_responses(
1818 &self,
1819 request: CompletionRequest,
1820 api_key: String,
1821 ) -> Result<BoxStream<'static, StreamChunk>> {
1822 let (model, reasoning_effort, service_tier) =
1823 Self::resolve_model_and_reasoning_effort_and_service_tier(&request.model);
1824 let instructions = Self::extract_responses_instructions(&request.messages);
1825 let input = Self::convert_messages_to_responses_input(&request.messages);
1826 let tools = Self::convert_responses_tools(&request.tools);
1827
1828 let mut body = json!({
1829 "model": model,
1830 "instructions": instructions,
1831 "input": input,
1832 "stream": true,
1833 "store": false,
1834 "tool_choice": "auto",
1835 "parallel_tool_calls": true,
1836 });
1837
1838 if !tools.is_empty() {
1839 body["tools"] = json!(tools);
1840 }
1841 if let Some(level) = reasoning_effort {
1842 body["reasoning"] = json!({ "effort": level.as_str() });
1843 }
1844 Self::apply_service_tier(&mut body, service_tier);
1845
1846 tracing::info!(
1847 backend = "openai-responses-http",
1848 instructions_len = body
1849 .get("instructions")
1850 .and_then(|v| v.as_str())
1851 .map(str::len)
1852 .unwrap_or(0),
1853 input_items = body
1854 .get("input")
1855 .and_then(|v| v.as_array())
1856 .map(Vec::len)
1857 .unwrap_or(0),
1858 has_tools = !tools.is_empty(),
1859 "Sending HTTP responses request"
1860 );
1861
1862 let response = self
1863 .client
1864 .post(format!("{}/responses", OPENAI_API_URL))
1865 .header("Authorization", format!("Bearer {}", api_key))
1866 .header("Content-Type", "application/json")
1867 .json(&body)
1868 .send()
1869 .await
1870 .context("Failed to send streaming request to OpenAI responses API")?;
1871
1872 let status = response.status();
1873 if !status.is_success() {
1874 let body = response.text().await.unwrap_or_default();
1875 anyhow::bail!(Self::format_openai_api_error(status, &body, &request.model));
1876 }
1877
1878 let stream = async_stream::stream! {
1879 let mut parser = ResponsesSseParser::default();
1880 let mut byte_stream = response.bytes_stream();
1881
1882 while let Some(result) = byte_stream.next().await {
1883 match result {
1884 Ok(bytes) => {
1885 for chunk in Self::parse_responses_sse_bytes(&mut parser, &bytes) {
1886 yield chunk;
1887 }
1888 }
1889 Err(error) => yield StreamChunk::Error(error.to_string()),
1890 }
1891 }
1892
1893 for chunk in Self::finish_responses_sse_parser(&mut parser) {
1894 yield chunk;
1895 }
1896 };
1897
1898 Ok(Box::pin(stream))
1899 }
1900
1901 async fn complete_stream_with_realtime(
1902 &self,
1903 request: CompletionRequest,
1904 access_token: String,
1905 chatgpt_account_id: Option<String>,
1906 backend: &'static str,
1907 ws_backend: ResponsesWsBackend,
1908 ) -> Result<BoxStream<'static, StreamChunk>> {
1909 let (model, reasoning_effort, service_tier) =
1910 Self::resolve_model_and_reasoning_effort_and_service_tier(&request.model);
1911 let body = Self::build_responses_ws_create_event_for_backend(
1912 &request,
1913 &model,
1914 reasoning_effort,
1915 service_tier,
1916 ws_backend,
1917 );
1918 tracing::info!(
1919 backend = backend,
1920 instructions_len = body
1921 .get("instructions")
1922 .and_then(|v| v.as_str())
1923 .map(str::len)
1924 .unwrap_or(0),
1925 input_items = body
1926 .get("input")
1927 .and_then(|v| v.as_array())
1928 .map(Vec::len)
1929 .unwrap_or(0),
1930 has_tools = body
1931 .get("tools")
1932 .and_then(|v| v.as_array())
1933 .is_some_and(|tools| !tools.is_empty()),
1934 "Sending responses websocket request"
1935 );
1936
1937 let connection = self
1938 .connect_responses_ws_with_token(
1939 &access_token,
1940 chatgpt_account_id.as_deref(),
1941 ws_backend,
1942 )
1943 .await?;
1944
1945 let stream = async_stream::stream! {
1946 let mut connection = connection;
1947 let mut parser = ResponsesSseParser::default();
1948 if let Err(error) = connection.send_event(&body).await {
1949 yield StreamChunk::Error(error.to_string());
1950 let _ = connection.close().await;
1951 return;
1952 }
1953
1954 let mut saw_terminal = false;
1955 loop {
1956 match connection.recv_event().await {
1957 Ok(Some(event)) => {
1958 let mut chunks = Vec::new();
1959 Self::parse_responses_event(&mut parser, &event, &mut chunks);
1960 for chunk in chunks {
1961 if matches!(chunk, StreamChunk::Done { .. } | StreamChunk::Error(_)) {
1962 saw_terminal = true;
1963 }
1964 yield chunk;
1965 }
1966 if saw_terminal {
1967 break;
1968 }
1969 }
1970 Ok(None) => {
1971 if !saw_terminal {
1972 yield StreamChunk::Error("Realtime WebSocket closed before response completion".to_string());
1973 }
1974 break;
1975 }
1976 Err(error) => {
1977 yield StreamChunk::Error(error.to_string());
1978 break;
1979 }
1980 }
1981 }
1982
1983 let _ = connection.close().await;
1984 };
1985
1986 Ok(Box::pin(stream))
1987 }
1988}
1989
1990#[async_trait]
1991impl Provider for OpenAiCodexProvider {
1992 fn name(&self) -> &str {
1993 "openai-codex"
1994 }
1995
1996 async fn list_models(&self) -> Result<Vec<ModelInfo>> {
1997 let mut models = vec![
1998 ModelInfo {
1999 id: "gpt-5".to_string(),
2000 name: "GPT-5".to_string(),
2001 provider: "openai-codex".to_string(),
2002 context_window: 400_000,
2003 max_output_tokens: Some(128_000),
2004 supports_vision: false,
2005 supports_tools: true,
2006 supports_streaming: true,
2007 input_cost_per_million: Some(0.0),
2008 output_cost_per_million: Some(0.0),
2009 },
2010 ModelInfo {
2011 id: "gpt-5-mini".to_string(),
2012 name: "GPT-5 Mini".to_string(),
2013 provider: "openai-codex".to_string(),
2014 context_window: 264_000,
2015 max_output_tokens: Some(64_000),
2016 supports_vision: false,
2017 supports_tools: true,
2018 supports_streaming: true,
2019 input_cost_per_million: Some(0.0),
2020 output_cost_per_million: Some(0.0),
2021 },
2022 ModelInfo {
2023 id: "gpt-5.1-codex".to_string(),
2024 name: "GPT-5.1 Codex".to_string(),
2025 provider: "openai-codex".to_string(),
2026 context_window: 400_000,
2027 max_output_tokens: Some(128_000),
2028 supports_vision: false,
2029 supports_tools: true,
2030 supports_streaming: true,
2031 input_cost_per_million: Some(0.0),
2032 output_cost_per_million: Some(0.0),
2033 },
2034 ModelInfo {
2035 id: "gpt-5.2".to_string(),
2036 name: "GPT-5.2".to_string(),
2037 provider: "openai-codex".to_string(),
2038 context_window: 400_000,
2039 max_output_tokens: Some(128_000),
2040 supports_vision: false,
2041 supports_tools: true,
2042 supports_streaming: true,
2043 input_cost_per_million: Some(0.0),
2044 output_cost_per_million: Some(0.0),
2045 },
2046 ModelInfo {
2047 id: "gpt-5.3-codex".to_string(),
2048 name: "GPT-5.3 Codex".to_string(),
2049 provider: "openai-codex".to_string(),
2050 context_window: 400_000,
2051 max_output_tokens: Some(128_000),
2052 supports_vision: false,
2053 supports_tools: true,
2054 supports_streaming: true,
2055 input_cost_per_million: Some(0.0),
2056 output_cost_per_million: Some(0.0),
2057 },
2058 ModelInfo {
2059 id: "gpt-5.4".to_string(),
2060 name: "GPT-5.4".to_string(),
2061 provider: "openai-codex".to_string(),
2062 context_window: 272_000,
2063 max_output_tokens: Some(128_000),
2064 supports_vision: false,
2065 supports_tools: true,
2066 supports_streaming: true,
2067 input_cost_per_million: Some(0.0),
2068 output_cost_per_million: Some(0.0),
2069 },
2070 ModelInfo {
2071 id: "gpt-5.4-fast".to_string(),
2072 name: "GPT-5.4 Fast".to_string(),
2073 provider: "openai-codex".to_string(),
2074 context_window: 272_000,
2075 max_output_tokens: Some(128_000),
2076 supports_vision: false,
2077 supports_tools: true,
2078 supports_streaming: true,
2079 input_cost_per_million: Some(0.0),
2080 output_cost_per_million: Some(0.0),
2081 },
2082 ModelInfo {
2083 id: "gpt-5.4-pro".to_string(),
2084 name: "GPT-5.4 Pro".to_string(),
2085 provider: "openai-codex".to_string(),
2086 context_window: 272_000,
2087 max_output_tokens: Some(128_000),
2088 supports_vision: false,
2089 supports_tools: true,
2090 supports_streaming: true,
2091 input_cost_per_million: Some(0.0),
2092 output_cost_per_million: Some(0.0),
2093 },
2094 ModelInfo {
2095 id: "o3".to_string(),
2096 name: "O3".to_string(),
2097 provider: "openai-codex".to_string(),
2098 context_window: 200_000,
2099 max_output_tokens: Some(100_000),
2100 supports_vision: true,
2101 supports_tools: true,
2102 supports_streaming: true,
2103 input_cost_per_million: Some(0.0),
2104 output_cost_per_million: Some(0.0),
2105 },
2106 ModelInfo {
2107 id: "o4-mini".to_string(),
2108 name: "O4 Mini".to_string(),
2109 provider: "openai-codex".to_string(),
2110 context_window: 200_000,
2111 max_output_tokens: Some(100_000),
2112 supports_vision: true,
2113 supports_tools: true,
2114 supports_streaming: true,
2115 input_cost_per_million: Some(0.0),
2116 output_cost_per_million: Some(0.0),
2117 },
2118 ];
2119
2120 if self.using_chatgpt_backend() {
2121 models.retain(|model| Self::chatgpt_supported_models().contains(&model.id.as_str()));
2122 }
2123
2124 Ok(models)
2125 }
2126
2127 async fn complete(&self, request: CompletionRequest) -> Result<CompletionResponse> {
2128 self.validate_model_for_backend(&request.model)?;
2129 let access_token = self.get_access_token().await?;
2130 if self.using_chatgpt_backend() {
2131 return self
2132 .complete_with_chatgpt_responses(request, access_token)
2133 .await;
2134 }
2135 self.complete_with_openai_responses(request, access_token)
2136 .await
2137 }
2138
2139 async fn complete_stream(
2140 &self,
2141 request: CompletionRequest,
2142 ) -> Result<BoxStream<'static, StreamChunk>> {
2143 self.validate_model_for_backend(&request.model)?;
2144 let access_token = self.get_access_token().await?;
2145 if self.using_chatgpt_backend() {
2146 return self
2147 .complete_stream_with_chatgpt_responses(request, access_token)
2148 .await;
2149 }
2150 self.complete_stream_with_openai_responses(request, access_token)
2151 .await
2152 }
2153}
2154
2155#[cfg(test)]
2156mod tests {
2157 use super::*;
2158 use futures::stream;
2159 use tokio::io::duplex;
2160 use tokio_tungstenite::{
2161 accept_hdr_async, client_async,
2162 tungstenite::{
2163 Message as WsMessage,
2164 handshake::server::{Request as ServerRequest, Response as ServerResponse},
2165 },
2166 };
2167
2168 #[test]
2169 fn test_generate_pkce() {
2170 let pkce = OpenAiCodexProvider::generate_pkce();
2171 assert!(!pkce.verifier.is_empty());
2172 assert!(!pkce.challenge.is_empty());
2173 assert_ne!(pkce.verifier, pkce.challenge);
2174 }
2175
2176 #[test]
2177 fn test_generate_state() {
2178 let state = OpenAiCodexProvider::generate_state();
2179 assert_eq!(state.len(), 32);
2180 }
2181
2182 #[test]
2183 fn formats_scope_error_with_actionable_message() {
2184 let body = r#"{"error":{"message":"Missing scopes: model.request"}}"#;
2185 let msg = OpenAiCodexProvider::format_openai_api_error(
2186 StatusCode::UNAUTHORIZED,
2187 body,
2188 "gpt-5.3-codex",
2189 );
2190 assert!(msg.contains("model.request"));
2191 assert!(msg.contains("codetether auth codex"));
2192 }
2193
2194 #[test]
2195 fn parses_model_suffix_for_thinking_level() {
2196 let (model, level) =
2197 OpenAiCodexProvider::resolve_model_and_reasoning_effort("gpt-5.3-codex:high");
2198 assert_eq!(model, "gpt-5.3-codex");
2199 assert_eq!(level.map(ThinkingLevel::as_str), Some("high"));
2200 }
2201
2202 #[test]
2203 fn maps_fast_model_alias_to_priority_service_tier() {
2204 let (model, level, service_tier) =
2205 OpenAiCodexProvider::resolve_model_and_reasoning_effort_and_service_tier(
2206 "gpt-5.4-fast:high",
2207 );
2208 assert_eq!(model, "gpt-5.4");
2209 assert_eq!(level.map(ThinkingLevel::as_str), Some("high"));
2210 assert_eq!(service_tier.map(CodexServiceTier::as_str), Some("priority"));
2211 }
2212
2213 #[test]
2214 fn ignores_unknown_model_suffix() {
2215 let (model, level) =
2216 OpenAiCodexProvider::resolve_model_and_reasoning_effort("gpt-5.3-codex:turbo");
2217 assert_eq!(model, "gpt-5.3-codex:turbo");
2218 assert_eq!(level, None);
2219 }
2220
2221 #[test]
2222 fn skips_reasoning_effort_for_non_reasoning_models() {
2223 let (model, level) = OpenAiCodexProvider::resolve_model_and_reasoning_effort("gpt-4o:high");
2224 assert_eq!(model, "gpt-4o");
2225 assert_eq!(level, None);
2226 }
2227
2228 #[tokio::test]
2229 async fn lists_gpt_5_4_models() {
2230 let provider = OpenAiCodexProvider::new();
2231 let models = provider
2232 .list_models()
2233 .await
2234 .expect("model listing should succeed");
2235
2236 assert!(models.iter().any(|model| model.id == "gpt-5.4"));
2237 assert!(models.iter().any(|model| model.id == "gpt-5.4-fast"));
2238 assert!(!models.iter().any(|model| model.id == "gpt-5.4-pro"));
2239 }
2240
2241 #[test]
2242 fn rejects_pro_model_for_chatgpt_backend() {
2243 let provider = OpenAiCodexProvider::new();
2244 let err = provider
2245 .validate_model_for_backend("gpt-5.4-pro")
2246 .expect_err("chatgpt backend should reject unsupported model");
2247 assert!(
2248 err.to_string()
2249 .contains("not supported when using Codex with a ChatGPT account")
2250 );
2251 }
2252
2253 #[test]
2254 fn allows_pro_model_for_api_key_backend() {
2255 let provider = OpenAiCodexProvider::from_api_key("test-key".to_string());
2256 provider
2257 .validate_model_for_backend("gpt-5.4-pro")
2258 .expect("api key backend should allow pro model");
2259 }
2260
2261 #[test]
2262 fn allows_fast_alias_for_chatgpt_backend() {
2263 let provider = OpenAiCodexProvider::new();
2264 provider
2265 .validate_model_for_backend("gpt-5.4-fast:high")
2266 .expect("chatgpt backend should allow fast alias");
2267 }
2268
2269 #[test]
2270 fn extracts_chatgpt_account_id_from_jwt_claims() {
2271 let header = URL_SAFE_NO_PAD.encode(r#"{"alg":"none","typ":"JWT"}"#);
2272 let payload = URL_SAFE_NO_PAD
2273 .encode(r#"{"https://api.openai.com/auth":{"chatgpt_account_id":"org_test123"}}"#);
2274 let jwt = format!("{header}.{payload}.sig");
2275
2276 let account_id = OpenAiCodexProvider::extract_chatgpt_account_id(&jwt);
2277 assert_eq!(account_id.as_deref(), Some("org_test123"));
2278 }
2279
2280 #[test]
2281 fn extracts_chatgpt_account_id_from_organizations_claim() {
2282 let header = URL_SAFE_NO_PAD.encode(r#"{"alg":"none","typ":"JWT"}"#);
2283 let payload = URL_SAFE_NO_PAD.encode(r#"{"organizations":[{"id":"org_from_list"}]}"#);
2284 let jwt = format!("{header}.{payload}.sig");
2285
2286 let account_id = OpenAiCodexProvider::extract_chatgpt_account_id(&jwt);
2287 assert_eq!(account_id.as_deref(), Some("org_from_list"));
2288 }
2289
2290 #[test]
2291 fn extracts_responses_instructions_from_system_messages() {
2292 let messages = vec![
2293 Message {
2294 role: Role::System,
2295 content: vec![ContentPart::Text {
2296 text: "first system block".to_string(),
2297 }],
2298 },
2299 Message {
2300 role: Role::User,
2301 content: vec![ContentPart::Text {
2302 text: "user message".to_string(),
2303 }],
2304 },
2305 Message {
2306 role: Role::System,
2307 content: vec![ContentPart::Text {
2308 text: "second system block".to_string(),
2309 }],
2310 },
2311 ];
2312
2313 let instructions = OpenAiCodexProvider::extract_responses_instructions(&messages);
2314 assert_eq!(instructions, "first system block\n\nsecond system block");
2315 }
2316
2317 #[test]
2318 fn falls_back_to_default_responses_instructions_without_system_message() {
2319 let messages = vec![Message {
2320 role: Role::User,
2321 content: vec![ContentPart::Text {
2322 text: "only user".to_string(),
2323 }],
2324 }];
2325
2326 let instructions = OpenAiCodexProvider::extract_responses_instructions(&messages);
2327 assert_eq!(instructions, DEFAULT_RESPONSES_INSTRUCTIONS);
2328 }
2329
2330 #[test]
2331 fn responses_input_ignores_system_messages() {
2332 let messages = vec![
2333 Message {
2334 role: Role::System,
2335 content: vec![ContentPart::Text {
2336 text: "system".to_string(),
2337 }],
2338 },
2339 Message {
2340 role: Role::User,
2341 content: vec![ContentPart::Text {
2342 text: "user".to_string(),
2343 }],
2344 },
2345 ];
2346
2347 let input = OpenAiCodexProvider::convert_messages_to_responses_input(&messages);
2348 assert_eq!(input.len(), 1);
2349 assert_eq!(input[0].get("role").and_then(Value::as_str), Some("user"));
2350 }
2351
2352 #[test]
2353 fn responses_ws_request_uses_bearer_auth() {
2354 let request = OpenAiCodexProvider::build_responses_ws_request_with_base_url(
2355 "wss://example.com/v1/responses",
2356 "test-token",
2357 )
2358 .expect("request should build");
2359
2360 assert_eq!(request.uri().to_string(), "wss://example.com/v1/responses");
2361 assert_eq!(
2362 request
2363 .headers()
2364 .get("Authorization")
2365 .and_then(|v| v.to_str().ok()),
2366 Some("Bearer test-token")
2367 );
2368 assert_eq!(
2369 request
2370 .headers()
2371 .get("User-Agent")
2372 .and_then(|v| v.to_str().ok()),
2373 Some("codetether-responses-ws/1.0")
2374 );
2375 }
2376
2377 #[test]
2378 fn responses_ws_request_includes_chatgpt_account_id_when_provided() {
2379 let request = OpenAiCodexProvider::build_responses_ws_request_with_base_url_and_account_id(
2380 "wss://example.com/v1/responses",
2381 "test-token",
2382 Some("org_123"),
2383 )
2384 .expect("request should build");
2385
2386 assert_eq!(
2387 request
2388 .headers()
2389 .get("chatgpt-account-id")
2390 .and_then(|v| v.to_str().ok()),
2391 Some("org_123")
2392 );
2393 }
2394
2395 #[test]
2396 fn builds_responses_ws_create_event_for_tools() {
2397 let request = CompletionRequest {
2398 messages: vec![
2399 Message {
2400 role: Role::System,
2401 content: vec![ContentPart::Text {
2402 text: "System prompt".to_string(),
2403 }],
2404 },
2405 Message {
2406 role: Role::User,
2407 content: vec![ContentPart::Text {
2408 text: "Inspect the repo".to_string(),
2409 }],
2410 },
2411 ],
2412 tools: vec![ToolDefinition {
2413 name: "read".to_string(),
2414 description: "Read a file".to_string(),
2415 parameters: json!({
2416 "type": "object",
2417 "properties": {
2418 "path": { "type": "string" }
2419 },
2420 "required": ["path"]
2421 }),
2422 }],
2423 model: "gpt-5.4".to_string(),
2424 temperature: None,
2425 top_p: None,
2426 max_tokens: Some(8192),
2427 stop: Vec::new(),
2428 };
2429
2430 let event = OpenAiCodexProvider::build_responses_ws_create_event(
2431 &request,
2432 "gpt-5.4",
2433 Some(ThinkingLevel::High),
2434 None,
2435 );
2436
2437 assert_eq!(
2438 event.get("type").and_then(Value::as_str),
2439 Some("response.create")
2440 );
2441 assert_eq!(event.get("model").and_then(Value::as_str), Some("gpt-5.4"));
2442 assert_eq!(event.get("store").and_then(Value::as_bool), Some(false));
2443 assert_eq!(
2444 event.get("max_output_tokens").and_then(Value::as_u64),
2445 Some(8192)
2446 );
2447 assert_eq!(
2448 event.get("tools").and_then(Value::as_array).map(Vec::len),
2449 Some(1)
2450 );
2451 }
2452
2453 #[tokio::test]
2454 async fn responses_ws_connection_round_trips_json_events() {
2455 let (client_io, server_io) = duplex(16 * 1024);
2456
2457 let server = tokio::spawn(async move {
2458 let callback = |request: &ServerRequest, response: ServerResponse| {
2459 assert_eq!(request.uri().path(), "/v1/responses");
2460 assert_eq!(
2461 request
2462 .headers()
2463 .get("Authorization")
2464 .and_then(|v| v.to_str().ok()),
2465 Some("Bearer test-token")
2466 );
2467 Ok(response)
2468 };
2469
2470 let mut socket = accept_hdr_async(server_io, callback)
2471 .await
2472 .expect("server websocket handshake should succeed");
2473 let message = socket
2474 .next()
2475 .await
2476 .expect("server should receive message")
2477 .expect("server message should be valid");
2478 match message {
2479 WsMessage::Text(text) => {
2480 let event: Value =
2481 serde_json::from_str(&text).expect("server should parse JSON event");
2482 assert_eq!(
2483 event.get("type").and_then(Value::as_str),
2484 Some("response.create")
2485 );
2486 }
2487 other => panic!("expected text frame, got {other:?}"),
2488 }
2489
2490 socket
2491 .send(WsMessage::Text(
2492 json!({ "type": "response.created" }).to_string().into(),
2493 ))
2494 .await
2495 .expect("server should send response");
2496 });
2497
2498 let request = OpenAiCodexProvider::build_responses_ws_request_with_base_url(
2499 "ws://localhost/v1/responses",
2500 "test-token",
2501 )
2502 .expect("client request should build");
2503 let (stream, _) = client_async(request, client_io)
2504 .await
2505 .expect("client websocket handshake should succeed");
2506 let mut connection = OpenAiRealtimeConnection::new(stream);
2507
2508 connection
2509 .send_event(&json!({ "type": "response.create", "model": "gpt-5.4", "input": [] }))
2510 .await
2511 .expect("client should send event");
2512 let event = connection
2513 .recv_event()
2514 .await
2515 .expect("client should read event")
2516 .expect("client should receive session event");
2517 assert_eq!(
2518 event.get("type").and_then(Value::as_str),
2519 Some("response.created")
2520 );
2521 connection
2522 .close()
2523 .await
2524 .expect("client should close cleanly");
2525
2526 server.await.expect("server task should finish");
2527 }
2528
2529 #[test]
2530 fn responses_sse_parser_buffers_split_tool_call_events() {
2531 let mut parser = ResponsesSseParser::default();
2532
2533 let chunk1 = br#"data: {"type":"response.output_item.added","item":{"type":"function_call","id":"fc_1","call_id":"call_1","name":"bash"}}
2534
2535da"#;
2536 let first = OpenAiCodexProvider::parse_responses_sse_bytes(&mut parser, chunk1);
2537 assert_eq!(first.len(), 1);
2538 match &first[0] {
2539 StreamChunk::ToolCallStart { id, name } => {
2540 assert_eq!(id, "call_1");
2541 assert_eq!(name, "bash");
2542 }
2543 other => panic!("expected tool start, got {other:?}"),
2544 }
2545
2546 let chunk2 = br#"ta: {"type":"response.function_call_arguments.delta","item_id":"fc_1","delta":"{\"command\":"}
2547
2548data: {"type":"response.function_call_arguments.delta","item_id":"fc_1","delta":"\"ls\"}"}
2549
2550data: {"type":"response.output_item.done","item":{"type":"function_call","id":"fc_1","call_id":"call_1","name":"bash","arguments":"{\"command\":\"ls\"}"}}
2551
2552data: {"type":"response.completed","response":{"usage":{"input_tokens":11,"output_tokens":7,"total_tokens":18}}}
2553
2554"#;
2555 let second = OpenAiCodexProvider::parse_responses_sse_bytes(&mut parser, chunk2);
2556
2557 assert_eq!(second.len(), 4);
2558 match &second[0] {
2559 StreamChunk::ToolCallDelta {
2560 id,
2561 arguments_delta,
2562 } => {
2563 assert_eq!(id, "call_1");
2564 assert_eq!(arguments_delta, "{\"command\":");
2565 }
2566 other => panic!("expected first tool delta, got {other:?}"),
2567 }
2568 match &second[1] {
2569 StreamChunk::ToolCallDelta {
2570 id,
2571 arguments_delta,
2572 } => {
2573 assert_eq!(id, "call_1");
2574 assert_eq!(arguments_delta, "\"ls\"}");
2575 }
2576 other => panic!("expected second tool delta, got {other:?}"),
2577 }
2578 match &second[2] {
2579 StreamChunk::ToolCallEnd { id } => assert_eq!(id, "call_1"),
2580 other => panic!("expected tool end, got {other:?}"),
2581 }
2582 match &second[3] {
2583 StreamChunk::Done { usage } => {
2584 let usage = usage.as_ref().expect("expected usage");
2585 assert_eq!(usage.prompt_tokens, 11);
2586 assert_eq!(usage.completion_tokens, 7);
2587 assert_eq!(usage.total_tokens, 18);
2588 }
2589 other => panic!("expected done, got {other:?}"),
2590 }
2591 }
2592
2593 #[test]
2594 fn responses_sse_parser_falls_back_to_done_item_arguments() {
2595 let mut parser = ResponsesSseParser::default();
2596 let bytes = br#"data: {"type":"response.output_item.done","item":{"type":"function_call","id":"fc_2","call_id":"call_2","name":"read","arguments":"{\"path\":\"src/main.rs\"}"}}
2597
2598"#;
2599
2600 let chunks = OpenAiCodexProvider::parse_responses_sse_bytes(&mut parser, bytes);
2601 assert_eq!(chunks.len(), 3);
2602 match &chunks[0] {
2603 StreamChunk::ToolCallStart { id, name } => {
2604 assert_eq!(id, "call_2");
2605 assert_eq!(name, "read");
2606 }
2607 other => panic!("expected tool start, got {other:?}"),
2608 }
2609 match &chunks[1] {
2610 StreamChunk::ToolCallDelta {
2611 id,
2612 arguments_delta,
2613 } => {
2614 assert_eq!(id, "call_2");
2615 assert_eq!(arguments_delta, "{\"path\":\"src/main.rs\"}");
2616 }
2617 other => panic!("expected tool delta, got {other:?}"),
2618 }
2619 match &chunks[2] {
2620 StreamChunk::ToolCallEnd { id } => assert_eq!(id, "call_2"),
2621 other => panic!("expected tool end, got {other:?}"),
2622 }
2623 }
2624
2625 #[test]
2626 fn responses_sse_parser_flushes_final_event_without_trailing_blank_line() {
2627 let mut parser = ResponsesSseParser::default();
2628 let bytes = br#"data: {"type":"response.output_item.done","item":{"type":"function_call","id":"fc_3","call_id":"call_3","name":"read","arguments":"{\"path\":\"src/lib.rs\"}"}}"#;
2629
2630 let first = OpenAiCodexProvider::parse_responses_sse_bytes(&mut parser, bytes);
2631 assert!(first.is_empty());
2632
2633 let flushed = OpenAiCodexProvider::finish_responses_sse_parser(&mut parser);
2634 assert_eq!(flushed.len(), 3);
2635 match &flushed[0] {
2636 StreamChunk::ToolCallStart { id, name } => {
2637 assert_eq!(id, "call_3");
2638 assert_eq!(name, "read");
2639 }
2640 other => panic!("expected tool start, got {other:?}"),
2641 }
2642 match &flushed[1] {
2643 StreamChunk::ToolCallDelta {
2644 id,
2645 arguments_delta,
2646 } => {
2647 assert_eq!(id, "call_3");
2648 assert_eq!(arguments_delta, "{\"path\":\"src/lib.rs\"}");
2649 }
2650 other => panic!("expected tool delta, got {other:?}"),
2651 }
2652 match &flushed[2] {
2653 StreamChunk::ToolCallEnd { id } => assert_eq!(id, "call_3"),
2654 other => panic!("expected tool end, got {other:?}"),
2655 }
2656 }
2657
2658 #[tokio::test]
2659 async fn collect_stream_completion_updates_tool_name_after_early_delta() {
2660 let stream = stream::iter(vec![
2661 StreamChunk::ToolCallDelta {
2662 id: "call_4".to_string(),
2663 arguments_delta: "{\"path\":\"src/provider/openai_codex.rs\"}".to_string(),
2664 },
2665 StreamChunk::ToolCallStart {
2666 id: "call_4".to_string(),
2667 name: "read".to_string(),
2668 },
2669 StreamChunk::Done { usage: None },
2670 ]);
2671
2672 let response = OpenAiCodexProvider::collect_stream_completion(Box::pin(stream))
2673 .await
2674 .expect("stream completion should succeed");
2675
2676 assert!(matches!(
2677 response.message.content.first(),
2678 Some(ContentPart::ToolCall { id, name, arguments, .. })
2679 if id == "call_4"
2680 && name == "read"
2681 && arguments == "{\"path\":\"src/provider/openai_codex.rs\"}"
2682 ));
2683 }
2684}