1use anyhow::{Context, Result, bail};
2use base64::Engine;
3use chrono::{Local, TimeZone};
4use clap::{Args, Parser, Subcommand};
5use dirs::home_dir;
6use reqwest::blocking::Client;
7use serde::{Deserialize, Serialize};
8use std::borrow::Cow;
9use std::cmp::Reverse;
10use std::collections::hash_map::DefaultHasher;
11use std::collections::{BTreeMap, BTreeSet, VecDeque};
12use std::env;
13use std::ffi::OsString;
14use std::fs;
15use std::hash::{Hash, Hasher};
16use std::io::{self, BufRead, Cursor, Read, Write};
17use std::net::{SocketAddr, TcpStream, ToSocketAddrs};
18use std::path::{Path, PathBuf};
19use std::process::{Command, ExitStatus, Stdio};
20use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering};
21use std::sync::mpsc::{self, Receiver, RecvTimeoutError, SyncSender, TrySendError};
22use std::sync::{Arc, Condvar, Mutex, OnceLock};
23use std::thread;
24use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
25use tiny_http::{
26 Header as TinyHeader, ReadWrite as TinyReadWrite, Response as TinyResponse,
27 Server as TinyServer, StatusCode as TinyStatusCode,
28};
29use tokio::runtime::{Builder as TokioRuntimeBuilder, Runtime as TokioRuntime};
30use tungstenite::client::IntoClientRequest;
31use tungstenite::error::UrlError as WsUrlError;
32use tungstenite::handshake::derive_accept_key;
33use tungstenite::http::{HeaderName as WsHeaderName, HeaderValue as WsHeaderValue};
34use tungstenite::protocol::Role as WsRole;
35use tungstenite::stream::MaybeTlsStream;
36use tungstenite::{
37 Error as WsError, HandshakeError as WsHandshakeError, Message as WsMessage,
38 WebSocket as WsSocket, client_tls_with_config,
39};
40
41mod app_commands;
42mod app_state;
43mod audit_log;
44mod cli_args;
45mod codex_config;
46mod command_dispatch;
47mod core_constants;
48mod housekeeping;
49mod profile_commands;
50mod profile_identity;
51mod quota_support;
52mod runtime_anthropic;
53mod runtime_background;
54mod runtime_broker;
55mod runtime_broker_shared;
56mod runtime_capabilities;
57mod runtime_caveman;
58mod runtime_claude;
59#[path = "runtime_tuning.rs"]
60mod runtime_config;
61mod runtime_core_shared;
62mod runtime_doctor;
63mod runtime_launch;
64mod runtime_launch_shared;
65mod runtime_mem;
66mod runtime_metrics;
67mod runtime_persistence;
68mod runtime_policy;
69mod runtime_proxy;
70mod runtime_proxy_shared;
71mod runtime_save_shared;
72mod runtime_state_shared;
73mod runtime_store;
74mod secret_store;
75mod shared_codex_fs;
76mod shared_types;
77#[path = "cli_render.rs"]
78mod terminal_ui;
79mod update_notice;
80
81#[cfg(feature = "bench-support")]
82#[doc(hidden)]
83pub mod bench_support;
84
85use app_commands::*;
86pub(crate) use app_state::*;
87use audit_log::*;
88pub(crate) use cli_args::*;
89pub(crate) use codex_config::*;
90pub(crate) use core_constants::*;
91use housekeeping::*;
92use profile_commands::*;
93use profile_identity::*;
94use quota_support::*;
95use runtime_anthropic::*;
96use runtime_background::*;
97use runtime_broker::*;
98use runtime_broker_shared::*;
99use runtime_capabilities::*;
100use runtime_caveman::*;
101use runtime_claude::*;
102use runtime_config::*;
103use runtime_core_shared::*;
104use runtime_doctor::*;
105use runtime_launch::*;
106use runtime_launch_shared::*;
107use runtime_mem::*;
108use runtime_persistence::*;
109use runtime_policy::*;
110use runtime_proxy::*;
111use runtime_proxy_shared::*;
112pub(crate) use runtime_save_shared::*;
113pub(crate) use runtime_state_shared::*;
114use runtime_store::*;
115use shared_codex_fs::*;
116pub(crate) use shared_types::*;
117use terminal_ui::*;
118use update_notice::*;
119
120#[cfg(test)]
121static TEST_ENV_LOCK: OnceLock<Mutex<()>> = OnceLock::new();
122
123#[cfg(test)]
124thread_local! {
125 static TEST_ENV_LOCK_DEPTH: std::cell::Cell<usize> = const { std::cell::Cell::new(0) };
126}
127
128#[cfg(test)]
129pub(crate) struct TestEnvLockGuard {
130 _guard: Option<std::sync::MutexGuard<'static, ()>>,
131}
132
133#[cfg(test)]
134pub(crate) fn acquire_test_env_lock() -> TestEnvLockGuard {
135 let guard = TEST_ENV_LOCK_DEPTH.with(|depth| {
136 let current = depth.get();
137 depth.set(current + 1);
138 if current == 0 {
139 Some(
140 TEST_ENV_LOCK
141 .get_or_init(|| Mutex::new(()))
142 .lock()
143 .unwrap_or_else(|poisoned| poisoned.into_inner()),
144 )
145 } else {
146 None
147 }
148 });
149
150 TestEnvLockGuard { _guard: guard }
151}
152
153#[cfg(test)]
154pub(crate) struct TestEnvVarGuard {
155 _lock: Option<TestEnvLockGuard>,
156 key: Option<&'static str>,
157 previous: Option<OsString>,
158}
159
160#[cfg(test)]
161impl TestEnvVarGuard {
162 pub(crate) fn lock() -> Self {
163 Self {
164 _lock: Some(acquire_test_env_lock()),
165 key: None,
166 previous: None,
167 }
168 }
169
170 pub(crate) fn set(key: &'static str, value: &str) -> Self {
171 let lock = acquire_test_env_lock();
172 let previous = env::var_os(key);
173 unsafe { env::set_var(key, value) };
175 Self {
176 _lock: Some(lock),
177 key: Some(key),
178 previous,
179 }
180 }
181
182 pub(crate) fn unset(key: &'static str) -> Self {
183 let lock = acquire_test_env_lock();
184 let previous = env::var_os(key);
185 unsafe { env::remove_var(key) };
187 Self {
188 _lock: Some(lock),
189 key: Some(key),
190 previous,
191 }
192 }
193}
194
195#[cfg(test)]
196impl Drop for TestEnvVarGuard {
197 fn drop(&mut self) {
198 if let Some(key) = self.key {
199 if let Some(value) = self.previous.as_ref() {
200 unsafe { env::set_var(key, value) };
202 } else {
203 unsafe { env::remove_var(key) };
205 }
206 }
207 }
208}
209
210#[cfg(test)]
211impl Drop for TestEnvLockGuard {
212 fn drop(&mut self) {
213 TEST_ENV_LOCK_DEPTH.with(|depth| {
214 let current = depth.get();
215 if current > 0 {
216 depth.set(current - 1);
217 }
218 });
219 }
220}
221
222#[cfg(test)]
223static TEST_RUNTIME_LOCK: OnceLock<Mutex<()>> = OnceLock::new();
224
225#[cfg(test)]
226thread_local! {
227 static TEST_RUNTIME_LOCK_DEPTH: std::cell::Cell<usize> = const { std::cell::Cell::new(0) };
228}
229
230#[cfg(test)]
231pub(crate) struct TestRuntimeLockGuard {
232 _guard: Option<std::sync::MutexGuard<'static, ()>>,
233}
234
235#[cfg(test)]
236pub(crate) fn acquire_test_runtime_lock() -> TestRuntimeLockGuard {
237 let guard = TEST_RUNTIME_LOCK_DEPTH.with(|depth| {
238 let current = depth.get();
239 depth.set(current + 1);
240 if current == 0 {
241 Some(
242 TEST_RUNTIME_LOCK
243 .get_or_init(|| Mutex::new(()))
244 .lock()
245 .unwrap_or_else(|poisoned| poisoned.into_inner()),
246 )
247 } else {
248 None
249 }
250 });
251
252 TestRuntimeLockGuard { _guard: guard }
253}
254
255#[cfg(test)]
256impl Drop for TestRuntimeLockGuard {
257 fn drop(&mut self) {
258 TEST_RUNTIME_LOCK_DEPTH.with(|depth| {
259 let current = depth.get();
260 if current > 0 {
261 depth.set(current - 1);
262 }
263 });
264 }
265}
266
267#[cfg(test)]
268mod test_env_guard_tests {
269 use super::*;
270
271 #[test]
272 fn test_env_var_guard_restores_previous_value_and_supports_nested_reentry() {
273 let key = "PRODEX_TEST_ENV_GUARD_REENTRY";
274 let previous = env::var_os(key);
275
276 {
277 let _outer = TestEnvVarGuard::set(key, "outer");
278 assert_eq!(env::var(key).ok().as_deref(), Some("outer"));
279
280 {
281 let _inner = TestEnvVarGuard::set(key, "inner");
282 assert_eq!(env::var(key).ok().as_deref(), Some("inner"));
283 }
284
285 assert_eq!(env::var(key).ok().as_deref(), Some("outer"));
286 }
287
288 assert_eq!(env::var_os(key), previous);
289 }
290}
291
292struct RuntimeRotationProxy {
293 server: Arc<TinyServer>,
294 shutdown: Arc<AtomicBool>,
295 worker_threads: Vec<thread::JoinHandle<()>>,
296 accept_worker_count: usize,
297 listen_addr: std::net::SocketAddr,
298 log_path: PathBuf,
299 active_request_count: Arc<AtomicUsize>,
300 owner_lock: Option<StateFileLock>,
301}
302
303type RuntimeLocalWebSocket = WsSocket<Box<dyn TinyReadWrite + Send>>;
304type RuntimeUpstreamWebSocket = WsSocket<MaybeTlsStream<TcpStream>>;
305
306fn runtime_set_upstream_websocket_io_timeout(
307 socket: &mut RuntimeUpstreamWebSocket,
308 timeout: Option<Duration>,
309) -> io::Result<()> {
310 match socket.get_mut() {
311 MaybeTlsStream::Plain(stream) => {
312 stream.set_read_timeout(timeout)?;
313 stream.set_write_timeout(timeout)?;
314 }
315 MaybeTlsStream::Rustls(stream) => {
316 stream.sock.set_read_timeout(timeout)?;
317 stream.sock.set_write_timeout(timeout)?;
318 }
319 _ => {}
320 }
321 Ok(())
322}
323
324fn runtime_websocket_timeout_error(err: &WsError) -> bool {
325 matches!(
326 err,
327 WsError::Io(io_err)
328 if matches!(
329 io_err.kind(),
330 io::ErrorKind::TimedOut | io::ErrorKind::WouldBlock
331 )
332 )
333}
334
335fn runtime_proxy_log(shared: &RuntimeRotationProxyShared, message: impl AsRef<str>) {
336 runtime_proxy_log_to_path(&shared.log_path, message.as_ref());
337}
338
339fn runtime_proxy_next_request_id(shared: &RuntimeRotationProxyShared) -> u64 {
340 shared.request_sequence.fetch_add(1, Ordering::Relaxed)
341}
342
343pub fn main_entry() {
344 if let Err(err) = run() {
345 eprintln!("Error: {err:#}");
346 std::process::exit(1);
347 }
348}
349
350fn run() -> Result<()> {
351 let command = parse_cli_command_or_exit();
352 if command.should_show_update_notice() {
353 let _ = show_update_notice_if_available(&command);
354 }
355 ensure_runtime_policy_valid()?;
356 command.execute()
357}
358
359fn parse_cli_command_or_exit() -> Commands {
360 match parse_cli_command_from(env::args_os()) {
361 Ok(command) => command,
362 Err(err) => err.exit(),
363 }
364}
365
366fn parse_cli_command_from<I, T>(args: I) -> std::result::Result<Commands, clap::Error>
367where
368 I: IntoIterator<Item = T>,
369 T: Into<OsString>,
370{
371 let raw_args = args.into_iter().map(Into::into).collect::<Vec<_>>();
372 let parse_args = if should_default_cli_invocation_to_run(&raw_args) {
373 rewrite_cli_args_as_run(&raw_args)
374 } else {
375 raw_args
376 };
377 Ok(Cli::try_parse_from(parse_args)?.command)
378}
379
380fn should_default_cli_invocation_to_run(args: &[OsString]) -> bool {
381 let Some(first_arg) = args.get(1).and_then(|arg| arg.to_str()) else {
382 return true;
383 };
384
385 !matches!(
386 first_arg,
387 "-h" | "--help"
388 | "-V"
389 | "--version"
390 | "profile"
391 | "use"
392 | "current"
393 | "info"
394 | "doctor"
395 | "audit"
396 | "cleanup"
397 | "login"
398 | "logout"
399 | "quota"
400 | "run"
401 | "caveman"
402 | "super"
403 | "claude"
404 | "help"
405 | "__runtime-broker"
406 )
407}
408
409fn rewrite_cli_args_as_run(args: &[OsString]) -> Vec<OsString> {
410 let mut rewritten = Vec::with_capacity(args.len() + 1);
411 rewritten.push(
412 args.first()
413 .cloned()
414 .unwrap_or_else(|| OsString::from("prodex")),
415 );
416 rewritten.push(OsString::from("run"));
417 rewritten.extend(args.iter().skip(1).cloned());
418 rewritten
419}
420
421fn codex_bin() -> OsString {
422 env::var_os("PRODEX_CODEX_BIN").unwrap_or_else(|| OsString::from("codex"))
423}
424
425fn claude_bin() -> OsString {
426 env::var_os("PRODEX_CLAUDE_BIN").unwrap_or_else(|| OsString::from("claude"))
427}
428
429#[cfg(test)]
430#[path = "../tests/support/main_internal_harness.rs"]
431mod main_internal_tests;
432
433#[cfg(test)]
434#[path = "../tests/support/compat_replay_body.rs"]
435mod compat_replay_tests;