use std::collections::HashMap;
use std::ffi::OsString;
use std::net::TcpListener as StdTcpListener;
use std::path::{Path, PathBuf};
use std::process::ExitStatus;
use anyhow::{Context, Result};
use futures_util::stream::SplitSink;
use futures_util::{Sink, SinkExt, StreamExt};
use serde::Deserialize;
use serde_json::{Value, json};
use tokio::net::{TcpListener, TcpStream};
use tokio::process::{Child, Command};
use tokio::time::{Duration, Instant, sleep, timeout};
use tokio_tungstenite::tungstenite::Message;
use tokio_tungstenite::tungstenite::client::IntoClientRequest;
use tokio_tungstenite::tungstenite::handshake::server::{ErrorResponse, Request, Response};
use tokio_tungstenite::tungstenite::http::header::AUTHORIZATION;
use tokio_tungstenite::{MaybeTlsStream, WebSocketStream, accept_hdr_async, connect_async};
use uuid::Uuid;
use crate::auto_switch::{self, AutoSwitchResult};
use crate::store;
use crate::token;
use crate::types::{AuthData, StoredAccount};
const REMOTE_TOKEN_ENV: &str = "CODEX_SWITCH_REMOTE_TOKEN";
const APP_SERVER_STARTUP_TIMEOUT: Duration = Duration::from_secs(10);
const APP_SERVER_REQUEST_TIMEOUT: Duration = Duration::from_secs(20);
const INTERNAL_REQUEST_ID_PREFIX: &str = "codex-switch/";
type WsStream = WebSocketStream<MaybeTlsStream<TcpStream>>;
type ProxyClientStream = WebSocketStream<TcpStream>;
pub async fn run_codex(
threshold: f64,
codex_bin: String,
codex_args: Vec<String>,
) -> Result<ExitStatus> {
validate_remote_capable_codex_args(&codex_args)?;
reject_remote_args(&codex_args)?;
let current_dir = std::env::current_dir().context("Failed to read current directory")?;
let codex_args = codex_args_with_default_cwd(&codex_args, ¤t_dir);
let initial = auto_switch::auto_switch_allow_running(threshold)
.await
.context("Initial account auto-switch failed")?;
if let AutoSwitchResult::ActiveUnsupported { reason, .. } = initial {
anyhow::bail!("active account does not support runtime auto-switch: {reason}");
}
let port = reserve_local_port()?;
let app_server_url = format!("ws://127.0.0.1:{port}");
let token = websocket_token();
let token_path = runtime_token_path()?;
store::write_private_file(&token_path, &token)?;
let mut app_server = match spawn_app_server(&codex_bin, &app_server_url, &token_path) {
Ok(app_server) => app_server,
Err(err) => {
let _ = std::fs::remove_file(&token_path);
return Err(err);
}
};
if let Err(err) = wait_for_app_server_ready(&app_server_url, &token, &mut app_server).await {
shutdown_child(&mut app_server).await;
let _ = std::fs::remove_file(&token_path);
return Err(err);
}
let (proxy_listener, proxy_url) = match bind_proxy_listener().await {
Ok(listener) => listener,
Err(err) => {
shutdown_child(&mut app_server).await;
let _ = std::fs::remove_file(&token_path);
return Err(err);
}
};
let mut proxy_task = tokio::spawn(run_websocket_proxy(
proxy_listener,
app_server_url.clone(),
token.clone(),
threshold,
));
let mut codex_child = match spawn_remote_codex(&codex_bin, &codex_args, &proxy_url, &token)
.context("Failed to start codex")
{
Ok(child) => child,
Err(err) => {
proxy_task.abort();
let _ = proxy_task.await;
shutdown_child(&mut app_server).await;
let _ = std::fs::remove_file(&token_path);
return Err(err);
}
};
let mut proxy_task_completed = false;
let status = tokio::select! {
status = codex_child.wait() => status.context("Failed to wait for codex"),
proxy_result = &mut proxy_task => {
proxy_task_completed = true;
match proxy_result {
Ok(Ok(())) => codex_child.wait().await.context("Failed to wait for codex"),
Ok(Err(err)) => {
shutdown_child(&mut codex_child).await;
Err(err).context("Codex websocket proxy failed")
}
Err(err) => {
shutdown_child(&mut codex_child).await;
Err(anyhow::anyhow!("Codex websocket proxy task failed: {err}"))
}
}
}
};
if !proxy_task_completed {
proxy_task.abort();
let _ = proxy_task.await;
}
shutdown_child(&mut app_server).await;
let _ = std::fs::remove_file(&token_path);
status
}
fn validate_remote_capable_codex_args(args: &[String]) -> Result<()> {
let Some(first_arg) = args.first() else {
return Ok(());
};
if first_arg.starts_with('-') || matches!(first_arg.as_str(), "resume" | "fork") {
return Ok(());
}
anyhow::bail!(
"codex-switch run only supports Codex interactive commands that accept --remote: codex, codex resume, or codex fork"
);
}
fn reject_remote_args(args: &[String]) -> Result<()> {
if args
.iter()
.any(|arg| arg == "--remote" || arg.starts_with("--remote="))
{
anyhow::bail!("do not pass --remote to codex-switch run");
}
if args
.iter()
.any(|arg| arg == "--remote-auth-token-env" || arg.starts_with("--remote-auth-token-env="))
{
anyhow::bail!("do not pass --remote-auth-token-env to codex-switch run");
}
Ok(())
}
fn codex_args_with_default_cwd(args: &[String], cwd: &Path) -> Vec<OsString> {
let mut result: Vec<OsString> = args.iter().map(OsString::from).collect();
if has_cwd_arg(args) {
return result;
}
let insert_at = match args.first().map(String::as_str) {
Some("resume" | "fork") => 1,
_ => 0,
};
result.insert(insert_at, OsString::from("-C"));
result.insert(insert_at + 1, cwd.as_os_str().to_os_string());
result
}
fn has_cwd_arg(args: &[String]) -> bool {
args.iter()
.take_while(|arg| arg.as_str() != "--")
.any(|arg| arg == "-C" || arg == "--cd" || arg.starts_with("--cd=") || is_short_cd_arg(arg))
}
fn is_short_cd_arg(arg: &str) -> bool {
arg.len() > 2 && arg.starts_with("-C")
}
fn reserve_local_port() -> Result<u16> {
let listener = StdTcpListener::bind(("127.0.0.1", 0))
.context("Failed to reserve local app-server port")?;
Ok(listener
.local_addr()
.context("Failed to read local app-server port")?
.port())
}
async fn bind_proxy_listener() -> Result<(TcpListener, String)> {
let listener = TcpListener::bind(("127.0.0.1", 0))
.await
.context("Failed to bind local Codex websocket proxy")?;
let port = listener
.local_addr()
.context("Failed to read local Codex websocket proxy port")?
.port();
Ok((listener, format!("ws://127.0.0.1:{port}")))
}
fn websocket_token() -> String {
format!("{}{}", Uuid::new_v4().simple(), Uuid::new_v4().simple())
}
fn runtime_token_path() -> Result<PathBuf> {
Ok(store::config_dir()?
.join("runtime")
.join(format!("ws-token-{}", Uuid::new_v4())))
}
fn spawn_app_server(codex_bin: &str, websocket_url: &str, token_path: &Path) -> Result<Child> {
Command::new(codex_bin)
.arg("app-server")
.arg("--listen")
.arg(websocket_url)
.arg("--ws-auth")
.arg("capability-token")
.arg("--ws-token-file")
.arg(token_path)
.stdin(std::process::Stdio::null())
.stdout(std::process::Stdio::null())
.stderr(std::process::Stdio::null())
.spawn()
.with_context(|| format!("Failed to start Codex app-server with {codex_bin}"))
}
async fn wait_for_app_server_ready(
websocket_url: &str,
token: &str,
app_server: &mut Child,
) -> Result<()> {
let deadline = Instant::now() + APP_SERVER_STARTUP_TIMEOUT;
let mut last_error_message = None;
loop {
if Instant::now() >= deadline {
let detail =
last_error_message.unwrap_or_else(|| "connection was not ready".to_string());
anyhow::bail!("Timed out waiting for Codex app-server: {detail}");
}
if let Some(status) = app_server
.try_wait()
.context("Failed to inspect Codex app-server status")?
{
anyhow::bail!("Codex app-server exited during startup: {status}");
}
match probe_app_server(websocket_url, token).await {
Ok(()) => return Ok(()),
Err(err) => last_error_message = Some(err.to_string()),
}
sleep(Duration::from_millis(100)).await;
}
}
async fn probe_app_server(websocket_url: &str, token: &str) -> Result<()> {
let mut websocket = connect_app_server_websocket(websocket_url, token).await?;
initialize_app_server(&mut websocket).await?;
websocket
.close(None)
.await
.context("Failed to close Codex app-server probe websocket")
}
fn spawn_remote_codex(
codex_bin: &str,
codex_args: &[OsString],
websocket_url: &str,
token: &str,
) -> Result<Child> {
let mut command = Command::new(codex_bin);
command
.args(codex_args)
.arg("--remote")
.arg(websocket_url)
.arg("--remote-auth-token-env")
.arg(REMOTE_TOKEN_ENV)
.env(REMOTE_TOKEN_ENV, token);
Ok(command.spawn()?)
}
async fn shutdown_child(child: &mut Child) {
match child.try_wait() {
Ok(Some(_)) => {}
Ok(None) => {
let _ = child.start_kill();
let _ = child.wait().await;
}
Err(_) => {
let _ = child.start_kill();
}
}
}
async fn run_websocket_proxy(
listener: TcpListener,
app_server_url: String,
token: String,
threshold: f64,
) -> Result<()> {
let mut state = ProxyState::new(threshold);
loop {
let (client_stream, _) = listener
.accept()
.await
.context("Failed to accept Codex websocket client")?;
let client_websocket = accept_proxy_client(client_stream, &token).await?;
let app_server_websocket = connect_app_server_websocket(&app_server_url, &token).await?;
proxy_websockets(client_websocket, app_server_websocket, &mut state).await?;
state.clear_connection_pending();
}
}
#[allow(clippy::result_large_err)]
async fn accept_proxy_client(stream: TcpStream, token: &str) -> Result<ProxyClientStream> {
let expected_auth = format!("Bearer {token}");
accept_hdr_async(stream, move |request: &Request, response: Response| {
let authorized = request
.headers()
.get(AUTHORIZATION)
.and_then(|value| value.to_str().ok())
.is_some_and(|value| value == expected_auth);
if authorized {
Ok(response)
} else {
Err(unauthorized_response())
}
})
.await
.context("Failed to accept Codex websocket client")
}
fn unauthorized_response() -> ErrorResponse {
Response::builder()
.status(401)
.body(Some("Unauthorized".to_string()))
.expect("static unauthorized websocket response is valid")
}
async fn proxy_websockets(
client_websocket: ProxyClientStream,
app_server_websocket: WsStream,
state: &mut ProxyState,
) -> Result<()> {
let (mut client_write, mut client_read) = client_websocket.split();
let (mut app_server_write, mut app_server_read) = app_server_websocket.split();
loop {
tokio::select! {
message = client_read.next() => {
let Some(message) = message else {
return Ok(());
};
let message = message.context("Failed to read Codex websocket client message")?;
if !handle_client_proxy_message(message, &mut app_server_write).await? {
return Ok(());
}
}
message = app_server_read.next() => {
let Some(message) = message else {
return Ok(());
};
let message = message.context("Failed to read Codex app-server websocket message")?;
if !handle_app_server_proxy_message(
message,
&mut client_write,
&mut app_server_write,
state,
)
.await?
{
return Ok(());
}
}
}
}
}
struct ProxyState {
threshold: f64,
request_prefix: String,
next_request_id: u64,
pending_internal: HashMap<String, PendingInternalRequest>,
app_server_account_id: Option<String>,
pending_login_account_id: Option<String>,
}
enum PendingInternalRequest {
Login { account_id: String },
}
impl ProxyState {
fn new(threshold: f64) -> Self {
Self {
threshold,
request_prefix: format!("{INTERNAL_REQUEST_ID_PREFIX}{}", Uuid::new_v4()),
next_request_id: 1,
pending_internal: HashMap::new(),
app_server_account_id: None,
pending_login_account_id: None,
}
}
fn next_internal_request_id(&mut self) -> String {
let id = format!("{}/{}", self.request_prefix, self.next_request_id);
self.next_request_id += 1;
id
}
fn clear_connection_pending(&mut self) {
self.pending_internal.clear();
self.pending_login_account_id = None;
}
async fn auto_switch_and_login(
&mut self,
app_server_write: &mut SplitSink<WsStream, Message>,
) -> Result<()> {
let result = auto_switch::auto_switch_allow_running(self.threshold).await?;
match result {
AutoSwitchResult::Switched { to, .. } => {
self.login_chatgpt_account(app_server_write, *to).await
}
AutoSwitchResult::ActiveKept { account, .. } => {
if self.app_server_account_id.as_deref() != Some(account.id.as_str()) {
self.login_chatgpt_account(app_server_write, *account)
.await?;
}
Ok(())
}
AutoSwitchResult::ActiveUnsupported { .. } => Ok(()),
}
}
async fn login_chatgpt_account(
&mut self,
app_server_write: &mut SplitSink<WsStream, Message>,
account: StoredAccount,
) -> Result<()> {
if self.app_server_account_id.as_deref() == Some(account.id.as_str())
|| self.pending_login_account_id.as_deref() == Some(account.id.as_str())
{
return Ok(());
}
let request_id = self.next_internal_request_id();
let request = login_request(Value::String(request_id.clone()), &account).await?;
send_json(app_server_write, request).await?;
self.pending_internal.insert(
request_id,
PendingInternalRequest::Login {
account_id: account.id.clone(),
},
);
self.pending_login_account_id = Some(account.id);
Ok(())
}
fn handle_internal_response(&mut self, value: &Value) -> bool {
let Some(request_id) = value.get("id").and_then(Value::as_str) else {
return false;
};
let Some(pending) = self.pending_internal.remove(request_id) else {
return false;
};
match pending {
PendingInternalRequest::Login { account_id } => {
if self.pending_login_account_id.as_deref() == Some(account_id.as_str()) {
self.pending_login_account_id = None;
}
if response_to_result(value).is_ok() {
self.app_server_account_id = Some(account_id);
}
}
}
true
}
}
async fn handle_client_proxy_message(
message: Message,
app_server_write: &mut SplitSink<WsStream, Message>,
) -> Result<bool> {
let should_continue = !matches!(message, Message::Close(_));
app_server_write
.send(message)
.await
.context("Failed to forward Codex client message to app-server")?;
Ok(should_continue)
}
async fn handle_app_server_proxy_message(
message: Message,
client_write: &mut SplitSink<ProxyClientStream, Message>,
app_server_write: &mut SplitSink<WsStream, Message>,
state: &mut ProxyState,
) -> Result<bool> {
let mut rate_limits_require_switch = false;
let mut should_forward = true;
if let Some(value) = message_json(&message)? {
if state.handle_internal_response(&value) {
should_forward = false;
} else if is_jsonrpc_request(&value)
&& value.get("method").and_then(Value::as_str)
== Some("account/chatgptAuthTokens/refresh")
{
handle_server_request(app_server_write, value).await?;
should_forward = false;
} else if rate_limit_notification_requires_switch(&value, state.threshold)
|| usage_limit_error_requires_switch(&value)
{
rate_limits_require_switch = true;
}
}
let should_continue = !matches!(message, Message::Close(_));
if should_forward {
client_write
.send(message)
.await
.context("Failed to forward Codex app-server message to client")?;
}
if rate_limits_require_switch {
let _ = state.auto_switch_and_login(app_server_write).await;
}
Ok(should_continue)
}
fn rate_limit_notification_requires_switch(value: &Value, threshold: f64) -> bool {
if value.get("method").and_then(Value::as_str) != Some("account/rateLimits/updated") {
return false;
}
let Some(params) = value.get("params") else {
return false;
};
let Ok(params) = serde_json::from_value::<AccountRateLimitsUpdatedParams>(params.clone())
else {
return false;
};
let info = params.rate_limits.into_usage_info();
auto_switch::usage_requires_switch(&info, threshold)
}
fn usage_limit_error_requires_switch(value: &Value) -> bool {
if value.get("method").and_then(Value::as_str) != Some("error") {
return false;
}
if value
.pointer("/params/error/codexErrorInfo")
.and_then(Value::as_str)
== Some("usageLimitExceeded")
{
return true;
}
value
.pointer("/params/error/message")
.and_then(Value::as_str)
.is_some_and(|message| message.to_ascii_lowercase().contains("usage limit"))
}
#[derive(Debug, Deserialize)]
#[serde(rename_all = "camelCase")]
struct AccountRateLimitsUpdatedParams {
rate_limits: AppServerRateLimitSnapshot,
}
#[derive(Debug, Deserialize)]
#[serde(rename_all = "camelCase")]
struct AppServerRateLimitSnapshot {
limit_id: Option<String>,
limit_name: Option<String>,
primary: Option<AppServerRateLimitWindow>,
secondary: Option<AppServerRateLimitWindow>,
credits: Option<AppServerCreditsSnapshot>,
plan_type: Option<String>,
rate_limit_reached_type: Option<String>,
}
impl AppServerRateLimitSnapshot {
fn into_usage_info(self) -> crate::types::UsageInfo {
crate::types::UsageInfo {
account_id: String::new(),
limit_id: self.limit_id,
limit_name: self.limit_name,
plan_type: self.plan_type,
primary_used_percent: self.primary.as_ref().map(|window| window.used_percent),
primary_window_minutes: self
.primary
.as_ref()
.and_then(|window| window.window_duration_mins),
primary_resets_at: self.primary.as_ref().and_then(|window| window.resets_at),
secondary_used_percent: self.secondary.as_ref().map(|window| window.used_percent),
secondary_window_minutes: self
.secondary
.as_ref()
.and_then(|window| window.window_duration_mins),
secondary_resets_at: self.secondary.as_ref().and_then(|window| window.resets_at),
has_credits: self.credits.as_ref().map(|credits| credits.has_credits),
unlimited_credits: self.credits.as_ref().map(|credits| credits.unlimited),
credits_balance: self.credits.and_then(|credits| credits.balance),
rate_limit_reached_type: self.rate_limit_reached_type,
additional_limits: Vec::new(),
error: None,
}
}
}
#[derive(Debug, Deserialize)]
#[serde(rename_all = "camelCase")]
struct AppServerRateLimitWindow {
used_percent: f64,
window_duration_mins: Option<i64>,
resets_at: Option<i64>,
}
#[derive(Debug, Deserialize)]
#[serde(rename_all = "camelCase")]
struct AppServerCreditsSnapshot {
has_credits: bool,
unlimited: bool,
balance: Option<String>,
}
async fn connect_app_server_websocket(websocket_url: &str, token: &str) -> Result<WsStream> {
let mut request = websocket_url
.into_client_request()
.with_context(|| format!("Invalid websocket URL: {websocket_url}"))?;
request.headers_mut().insert(
AUTHORIZATION,
format!("Bearer {token}")
.parse()
.context("Invalid websocket auth token")?,
);
let (websocket, _) = connect_async(request)
.await
.with_context(|| format!("Failed to connect to Codex app-server at {websocket_url}"))?;
Ok(websocket)
}
async fn initialize_app_server(websocket: &mut WsStream) -> Result<()> {
send_json(
websocket,
json!({
"id": "initialize",
"method": "initialize",
"params": {
"clientInfo": {
"name": "codex-switch",
"title": null,
"version": env!("CARGO_PKG_VERSION")
},
"capabilities": {
"experimentalApi": true
}
}
}),
)
.await?;
let deadline = Instant::now() + APP_SERVER_REQUEST_TIMEOUT;
loop {
let remaining = deadline.saturating_duration_since(Instant::now());
if remaining.is_zero() {
anyhow::bail!("Timed out waiting for Codex app-server initialize response");
}
let message = timeout(remaining, websocket.next())
.await
.context("Timed out waiting for Codex app-server initialize response")?
.context("Codex app-server closed during initialize")?
.context("Failed to read Codex app-server initialize response")?;
let Some(value) = message_json(&message)? else {
continue;
};
if value.get("id").and_then(Value::as_str) == Some("initialize") {
return response_to_result(&value);
}
if is_jsonrpc_request(&value) {
send_error_response(websocket, value.get("id").cloned(), "unsupported request").await?;
}
}
}
async fn login_request(request_id: Value, account: &StoredAccount) -> Result<Value> {
let payload = external_auth_payload(account).await?;
Ok(json!({
"id": request_id,
"method": "account/login/start",
"params": {
"type": "chatgptAuthTokens",
"accessToken": payload.access_token,
"chatgptAccountId": payload.chatgpt_account_id,
"chatgptPlanType": payload.chatgpt_plan_type
}
}))
}
async fn handle_server_request<S>(websocket: &mut S, request: Value) -> Result<()>
where
S: Sink<Message> + Unpin,
S::Error: std::error::Error + Send + Sync + 'static,
{
let id = request.get("id").cloned();
let method = request.get("method").and_then(Value::as_str);
match method {
Some("account/chatgptAuthTokens/refresh") => {
let previous_account_id = request
.get("params")
.and_then(|params| params.get("previousAccountId"))
.and_then(Value::as_str)
.map(str::to_string);
match refresh_external_auth(previous_account_id).await {
Ok(payload) => {
send_json(
websocket,
json!({
"id": id,
"result": {
"accessToken": payload.access_token,
"chatgptAccountId": payload.chatgpt_account_id,
"chatgptPlanType": payload.chatgpt_plan_type
}
}),
)
.await
}
Err(err) => send_error_response(websocket, id, &err.to_string()).await,
}
}
_ => send_error_response(websocket, id, "unsupported request").await,
}
}
async fn refresh_external_auth(previous_account_id: Option<String>) -> Result<ExternalAuthPayload> {
let account = find_refresh_account(previous_account_id.as_deref())?;
let refreshed = token::refresh_chatgpt_tokens(&account).await?;
external_auth_payload(&refreshed).await
}
fn find_refresh_account(previous_account_id: Option<&str>) -> Result<StoredAccount> {
let store = store::load_accounts()?;
if let Some(previous_account_id) = previous_account_id {
return store
.accounts
.into_iter()
.find(|account| chatgpt_account_id(account).as_deref() == Some(previous_account_id))
.with_context(|| {
format!("No stored account matches ChatGPT account id {previous_account_id}")
});
}
let active_id = store.active_account_id.as_deref();
store
.accounts
.into_iter()
.find(|account| {
Some(account.id.as_str()) == active_id
&& matches!(account.auth_data, AuthData::ChatGPT { .. })
})
.context("No active ChatGPT account available for auth refresh")
}
fn chatgpt_account_id(account: &StoredAccount) -> Option<String> {
match &account.auth_data {
AuthData::ChatGPT { account_id, .. } => account_id.clone(),
AuthData::ApiKey { .. } => None,
}
}
struct ExternalAuthPayload {
access_token: String,
chatgpt_account_id: String,
chatgpt_plan_type: Option<String>,
}
async fn external_auth_payload(account: &StoredAccount) -> Result<ExternalAuthPayload> {
let account = token::ensure_chatgpt_tokens_fresh(account).await?;
match account.auth_data {
AuthData::ChatGPT {
access_token,
account_id,
..
} => Ok(ExternalAuthPayload {
access_token,
chatgpt_account_id: account_id
.filter(|account_id| !account_id.trim().is_empty())
.context("ChatGPT account id is required for runtime switching")?,
chatgpt_plan_type: account.plan_type,
}),
AuthData::ApiKey { .. } => {
anyhow::bail!("API key accounts do not support runtime switching")
}
}
}
async fn send_error_response<S>(websocket: &mut S, id: Option<Value>, message: &str) -> Result<()>
where
S: Sink<Message> + Unpin,
S::Error: std::error::Error + Send + Sync + 'static,
{
send_json(
websocket,
json!({
"id": id,
"error": {
"code": -32000,
"message": message
}
}),
)
.await
}
async fn send_json<S>(websocket: &mut S, value: Value) -> Result<()>
where
S: Sink<Message> + Unpin,
S::Error: std::error::Error + Send + Sync + 'static,
{
websocket
.send(Message::Text(value.to_string().into()))
.await
.context("Failed to write Codex app-server websocket message")
}
fn message_json(message: &Message) -> Result<Option<Value>> {
match message {
Message::Text(text) => {
let value = serde_json::from_str(text.as_str())
.context("Failed to parse Codex app-server websocket message")?;
Ok(Some(value))
}
Message::Close(_)
| Message::Ping(_)
| Message::Pong(_)
| Message::Binary(_)
| Message::Frame(_) => Ok(None),
}
}
fn response_to_result(value: &Value) -> Result<()> {
if let Some(error) = value.get("error") {
let message = error
.get("message")
.and_then(Value::as_str)
.unwrap_or("Codex app-server request failed");
anyhow::bail!("{message}");
}
if value.get("result").is_some() {
return Ok(());
}
anyhow::bail!("Codex app-server response did not include result")
}
fn is_jsonrpc_request(value: &Value) -> bool {
value.get("method").is_some() && value.get("id").is_some()
}
#[cfg(test)]
mod tests {
use std::ffi::OsString;
use std::path::Path;
use serde_json::json;
use super::{
codex_args_with_default_cwd, has_cwd_arg, rate_limit_notification_requires_switch,
usage_limit_error_requires_switch, validate_remote_capable_codex_args,
};
#[test]
fn rate_limit_notification_triggers_when_threshold_is_reached() {
let notification = json!({
"method": "account/rateLimits/updated",
"params": {
"rateLimits": {
"limitId": "codex",
"limitName": null,
"primary": {
"usedPercent": 96,
"windowDurationMins": 300,
"resetsAt": 1_800_000_000
},
"secondary": null,
"credits": null,
"planType": "plus",
"rateLimitReachedType": null
}
}
});
assert!(rate_limit_notification_requires_switch(¬ification, 95.0));
assert!(!rate_limit_notification_requires_switch(
¬ification,
97.0
));
}
#[test]
fn rate_limit_notification_triggers_when_limit_type_is_set() {
let notification = json!({
"method": "account/rateLimits/updated",
"params": {
"rateLimits": {
"limitId": "codex",
"limitName": null,
"primary": null,
"secondary": null,
"credits": null,
"planType": "plus",
"rateLimitReachedType": "workspace_owner_usage_limit_reached"
}
}
});
assert!(rate_limit_notification_requires_switch(
¬ification,
100.0
));
}
#[test]
fn usage_limit_error_triggers_switch() {
assert!(usage_limit_error_requires_switch(&json!({
"method": "error",
"params": {
"error": {
"message": "You've hit your usage limit. Try again later.",
"codexErrorInfo": null,
"additionalDetails": null
},
"willRetry": false,
"threadId": "thread",
"turnId": "turn"
}
})));
assert!(usage_limit_error_requires_switch(&json!({
"method": "error",
"params": {
"error": {
"message": "rate limited",
"codexErrorInfo": "usageLimitExceeded",
"additionalDetails": null
},
"willRetry": false,
"threadId": "thread",
"turnId": "turn"
}
})));
assert!(!usage_limit_error_requires_switch(&json!({
"method": "error",
"params": {
"error": {
"message": "context window exceeded",
"codexErrorInfo": "contextWindowExceeded",
"additionalDetails": null
},
"willRetry": false,
"threadId": "thread",
"turnId": "turn"
}
})));
}
#[test]
fn run_rejects_non_remote_codex_subcommands() {
assert!(validate_remote_capable_codex_args(&[]).is_ok());
assert!(validate_remote_capable_codex_args(&["resume".to_string()]).is_ok());
assert!(validate_remote_capable_codex_args(&["fork".to_string()]).is_ok());
assert!(validate_remote_capable_codex_args(&["--model".to_string()]).is_ok());
assert!(validate_remote_capable_codex_args(&["exec".to_string()]).is_err());
}
#[test]
fn run_injects_cwd_for_default_tui() {
assert_eq!(
codex_args_with_default_cwd(&[], Path::new("/repo")),
vec![OsString::from("-C"), OsString::from("/repo")]
);
}
#[test]
fn run_injects_cwd_after_interactive_subcommand() {
assert_eq!(
codex_args_with_default_cwd(&["resume".to_string()], Path::new("/repo")),
vec![
OsString::from("resume"),
OsString::from("-C"),
OsString::from("/repo")
]
);
assert_eq!(
codex_args_with_default_cwd(
&["fork".to_string(), "session-id".to_string()],
Path::new("/repo"),
),
vec![
OsString::from("fork"),
OsString::from("-C"),
OsString::from("/repo"),
OsString::from("session-id")
]
);
}
#[test]
fn run_does_not_override_explicit_cwd() {
assert!(has_cwd_arg(&["-C".to_string(), "/other".to_string()]));
assert!(has_cwd_arg(&["--cd".to_string(), "/other".to_string()]));
assert!(has_cwd_arg(&["--cd=/other".to_string()]));
assert_eq!(
codex_args_with_default_cwd(
&["resume".to_string(), "--cd=/other".to_string()],
Path::new("/repo"),
),
vec![OsString::from("resume"), OsString::from("--cd=/other")]
);
}
}