Skip to main content

byokey_provider/executor/
codex_ws.rs

1//! Codex WebSocket executor.
2//!
3//! Connects to `ChatGPT`'s WebSocket endpoint and translates the Codex
4//! Responses protocol to `OpenAI` chat completion SSE format.  Uses the same
5//! SSE translator as the HTTP executor
6//! ([`super::codex::translate_codex_responses_sse`]).
7
8use crate::registry;
9use aigw_openai::{ResponsesRequestConfig, build_responses_create_request};
10use async_trait::async_trait;
11use byokey_auth::AuthManager;
12use byokey_types::{
13    ByokError, ChatRequest, ProviderId,
14    traits::{ByteStream, ProviderExecutor, ProviderResponse, Result},
15};
16use bytes::Bytes;
17use futures_util::{SinkExt as _, StreamExt as _, stream::try_unfold};
18use serde_json::Value;
19use std::sync::Arc;
20use tokio_tungstenite::tungstenite;
21
22use super::codex::translate_codex_responses_sse;
23
24/// `ChatGPT` WebSocket endpoint for the Codex Responses API.
25const WS_URL: &str = "wss://chatgpt.com/backend-api/codex/ws";
26
27/// Feature flag header sent to enable the WebSocket protocol.
28const WS_BETA: &str = "responses_websockets=2026-02-06";
29
30/// User-Agent matching the Codex CLI binary.
31const CODEX_USER_AGENT: &str = "codex-tui/0.120.0 (Mac OS 26.0.1; arm64) Apple_Terminal/464";
32
33/// WebSocket-based executor for the Codex API.
34///
35/// Each call to [`chat_completion`] opens a fresh WebSocket connection
36/// (no connection pooling in this initial implementation).
37pub struct CodexWsExecutor {
38    auth: Arc<AuthManager>,
39}
40
41impl CodexWsExecutor {
42    /// Creates a new WebSocket executor backed by the given [`AuthManager`].
43    pub fn new(auth: Arc<AuthManager>) -> Self {
44        Self { auth }
45    }
46
47    /// Retrieves an OAuth access token from the auth manager.
48    async fn token(&self) -> Result<String> {
49        let tok = self.auth.get_token(&ProviderId::Codex).await?;
50        Ok(tok.access_token)
51    }
52}
53
54#[async_trait]
55impl ProviderExecutor for CodexWsExecutor {
56    async fn chat_completion(&self, request: ChatRequest) -> Result<ProviderResponse> {
57        let token = self.token().await?;
58
59        // Translate the OpenAI chat request to Codex format via aigateway's
60        // ResponsesRequestTranslator (codex preset).
61        let aigw_request: aigw_core::model::ChatRequest =
62            serde_json::from_value(request.into_body())
63                .map_err(|e: serde_json::Error| ByokError::Translation(e.to_string()))?;
64        let responses_req =
65            build_responses_create_request(&aigw_request, &ResponsesRequestConfig::codex())
66                .map_err(|e| ByokError::Translation(e.to_string()))?;
67        let mut codex_body = serde_json::to_value(&responses_req)
68            .map_err(|e: serde_json::Error| ByokError::Translation(e.to_string()))?;
69        codex_body["stream"] = Value::Bool(true);
70        codex_body["type"] = Value::String("response.create".into());
71
72        // Build the WebSocket handshake request with required headers.
73        let ws_request = http::Request::builder()
74            .uri(WS_URL)
75            .header("Authorization", format!("Bearer {token}"))
76            .header("OpenAI-Beta", WS_BETA)
77            .header("User-Agent", CODEX_USER_AGENT)
78            .header("Originator", "codex_cli_rs")
79            .header(
80                "Sec-WebSocket-Key",
81                tungstenite::handshake::client::generate_key(),
82            )
83            .header("Sec-WebSocket-Version", "13")
84            .header("Connection", "Upgrade")
85            .header("Upgrade", "websocket")
86            .header("Host", "chatgpt.com")
87            .body(())
88            .map_err(|e| ByokError::Http(format!("failed to build WS request: {e}")))?;
89
90        // Connect to the WebSocket endpoint.
91        let (ws_stream, _response) = tokio_tungstenite::connect_async(ws_request)
92            .await
93            .map_err(|e| ByokError::Http(format!("WebSocket connect failed: {e}")))?;
94
95        let (mut sink, stream) = ws_stream.split();
96
97        // Send the translated request body as a text message.
98        let payload = serde_json::to_string(&codex_body)
99            .map_err(|e| ByokError::Http(format!("failed to serialize body: {e}")))?;
100        sink.send(tungstenite::Message::Text(payload.into()))
101            .await
102            .map_err(|e| ByokError::Http(format!("WebSocket send failed: {e}")))?;
103
104        // Convert the incoming WebSocket messages into SSE-formatted bytes,
105        // then pipe through the existing Codex SSE translator.
106        let raw_stream: ByteStream = Box::pin(try_unfold(stream, |mut ws_rx| async move {
107            loop {
108                match ws_rx.next().await {
109                    Some(Ok(tungstenite::Message::Text(text))) => {
110                        // Wrap each JSON event in SSE "data:" framing so
111                        // the shared translator can parse it.
112                        let sse_line = format!("data: {text}\n\n");
113                        return Ok(Some((Bytes::from(sse_line), ws_rx)));
114                    }
115                    Some(Ok(tungstenite::Message::Close(_))) | None => {
116                        // Stream finished.
117                        return Ok(None);
118                    }
119                    Some(Ok(_)) => {
120                        // Ignore ping/pong/binary frames.
121                    }
122                    Some(Err(e)) => {
123                        return Err(ByokError::Http(format!("WebSocket recv error: {e}")));
124                    }
125                }
126            }
127        }));
128
129        Ok(ProviderResponse::Stream(translate_codex_responses_sse(
130            raw_stream,
131        )))
132    }
133
134    fn supported_models(&self) -> Vec<String> {
135        registry::models_for_provider(&ProviderId::Codex)
136    }
137}
138
139#[cfg(test)]
140mod tests {
141    use super::*;
142
143    fn make_executor() -> CodexWsExecutor {
144        let (_client, auth) = crate::http_util::test_auth();
145        CodexWsExecutor::new(auth)
146    }
147
148    #[test]
149    fn test_supported_models_non_empty() {
150        let ex = make_executor();
151        assert!(!ex.supported_models().is_empty());
152    }
153
154    #[test]
155    fn test_supported_models_contains_o4_mini() {
156        let ex = make_executor();
157        assert!(ex.supported_models().iter().any(|m| m == "o4-mini"));
158    }
159}