Skip to main content

prodex/
lib.rs

1use anyhow::{Context, Result, bail};
2use base64::Engine;
3use chrono::{Local, TimeZone};
4use clap::{Args, Parser, Subcommand};
5use dirs::home_dir;
6use reqwest::blocking::Client;
7use serde::{Deserialize, Serialize};
8use std::borrow::Cow;
9use std::cmp::Reverse;
10use std::collections::hash_map::DefaultHasher;
11use std::collections::{BTreeMap, BTreeSet, VecDeque};
12use std::env;
13use std::ffi::OsString;
14use std::fs;
15use std::hash::{Hash, Hasher};
16use std::io::{self, BufRead, Cursor, Read, Write};
17use std::net::{SocketAddr, TcpStream, ToSocketAddrs};
18use std::path::{Path, PathBuf};
19use std::process::{Command, ExitStatus, Stdio};
20use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering};
21use std::sync::mpsc::{self, Receiver, RecvTimeoutError, SyncSender, TrySendError};
22use std::sync::{Arc, Condvar, Mutex, OnceLock};
23use std::thread;
24use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
25use tiny_http::{
26    Header as TinyHeader, ReadWrite as TinyReadWrite, Response as TinyResponse,
27    Server as TinyServer, StatusCode as TinyStatusCode,
28};
29use tokio::runtime::{Builder as TokioRuntimeBuilder, Runtime as TokioRuntime};
30use tungstenite::client::IntoClientRequest;
31use tungstenite::error::UrlError as WsUrlError;
32use tungstenite::handshake::derive_accept_key;
33use tungstenite::http::{HeaderName as WsHeaderName, HeaderValue as WsHeaderValue};
34use tungstenite::protocol::Role as WsRole;
35use tungstenite::stream::MaybeTlsStream;
36use tungstenite::{
37    Error as WsError, HandshakeError as WsHandshakeError, Message as WsMessage,
38    WebSocket as WsSocket, client_tls_with_config,
39};
40
41mod app_commands;
42mod app_state;
43mod audit_log;
44mod cli_args;
45mod codex_config;
46mod command_dispatch;
47mod core_constants;
48mod housekeeping;
49mod profile_commands;
50mod profile_identity;
51mod quota_support;
52mod runtime_anthropic;
53mod runtime_background;
54mod runtime_broker;
55mod runtime_broker_shared;
56mod runtime_capabilities;
57mod runtime_caveman;
58mod runtime_claude;
59#[path = "runtime_tuning.rs"]
60mod runtime_config;
61mod runtime_core_shared;
62mod runtime_doctor;
63mod runtime_launch;
64mod runtime_launch_shared;
65mod runtime_mem;
66mod runtime_metrics;
67mod runtime_persistence;
68mod runtime_policy;
69mod runtime_proxy;
70mod runtime_proxy_shared;
71mod runtime_save_shared;
72mod runtime_state_shared;
73mod runtime_store;
74mod secret_store;
75mod shared_codex_fs;
76mod shared_types;
77#[path = "cli_render.rs"]
78mod terminal_ui;
79mod update_notice;
80
81#[cfg(feature = "bench-support")]
82#[doc(hidden)]
83pub mod bench_support;
84
85use app_commands::*;
86pub(crate) use app_state::*;
87use audit_log::*;
88pub(crate) use cli_args::*;
89pub(crate) use codex_config::*;
90pub(crate) use core_constants::*;
91use housekeeping::*;
92use profile_commands::*;
93use profile_identity::*;
94use quota_support::*;
95use runtime_anthropic::*;
96use runtime_background::*;
97use runtime_broker::*;
98use runtime_broker_shared::*;
99use runtime_capabilities::*;
100use runtime_caveman::*;
101use runtime_claude::*;
102use runtime_config::*;
103use runtime_core_shared::*;
104use runtime_doctor::*;
105use runtime_launch::*;
106use runtime_launch_shared::*;
107use runtime_mem::*;
108use runtime_persistence::*;
109use runtime_policy::*;
110use runtime_proxy::*;
111use runtime_proxy_shared::*;
112pub(crate) use runtime_save_shared::*;
113pub(crate) use runtime_state_shared::*;
114use runtime_store::*;
115use shared_codex_fs::*;
116pub(crate) use shared_types::*;
117use terminal_ui::*;
118use update_notice::*;
119
120#[cfg(test)]
121static TEST_ENV_LOCK: OnceLock<Mutex<()>> = OnceLock::new();
122
123#[cfg(test)]
124thread_local! {
125    static TEST_ENV_LOCK_DEPTH: std::cell::Cell<usize> = const { std::cell::Cell::new(0) };
126}
127
128#[cfg(test)]
129pub(crate) struct TestEnvLockGuard {
130    _guard: Option<std::sync::MutexGuard<'static, ()>>,
131}
132
133#[cfg(test)]
134pub(crate) fn acquire_test_env_lock() -> TestEnvLockGuard {
135    let guard = TEST_ENV_LOCK_DEPTH.with(|depth| {
136        let current = depth.get();
137        depth.set(current + 1);
138        if current == 0 {
139            Some(
140                TEST_ENV_LOCK
141                    .get_or_init(|| Mutex::new(()))
142                    .lock()
143                    .unwrap_or_else(|poisoned| poisoned.into_inner()),
144            )
145        } else {
146            None
147        }
148    });
149
150    TestEnvLockGuard { _guard: guard }
151}
152
153#[cfg(test)]
154pub(crate) struct TestEnvVarGuard {
155    _lock: Option<TestEnvLockGuard>,
156    key: Option<&'static str>,
157    previous: Option<OsString>,
158}
159
160#[cfg(test)]
161impl TestEnvVarGuard {
162    pub(crate) fn lock() -> Self {
163        Self {
164            _lock: Some(acquire_test_env_lock()),
165            key: None,
166            previous: None,
167        }
168    }
169
170    pub(crate) fn set(key: &'static str, value: &str) -> Self {
171        let lock = acquire_test_env_lock();
172        let previous = env::var_os(key);
173        // SAFETY: test env mutation is serialized by the shared env lock guard.
174        unsafe { env::set_var(key, value) };
175        Self {
176            _lock: Some(lock),
177            key: Some(key),
178            previous,
179        }
180    }
181
182    pub(crate) fn unset(key: &'static str) -> Self {
183        let lock = acquire_test_env_lock();
184        let previous = env::var_os(key);
185        // SAFETY: test env mutation is serialized by the shared env lock guard.
186        unsafe { env::remove_var(key) };
187        Self {
188            _lock: Some(lock),
189            key: Some(key),
190            previous,
191        }
192    }
193}
194
195#[cfg(test)]
196impl Drop for TestEnvVarGuard {
197    fn drop(&mut self) {
198        if let Some(key) = self.key {
199            if let Some(value) = self.previous.as_ref() {
200                // SAFETY: test env mutation is serialized by the shared env lock guard.
201                unsafe { env::set_var(key, value) };
202            } else {
203                // SAFETY: test env mutation is serialized by the shared env lock guard.
204                unsafe { env::remove_var(key) };
205            }
206        }
207    }
208}
209
210#[cfg(test)]
211impl Drop for TestEnvLockGuard {
212    fn drop(&mut self) {
213        TEST_ENV_LOCK_DEPTH.with(|depth| {
214            let current = depth.get();
215            if current > 0 {
216                depth.set(current - 1);
217            }
218        });
219    }
220}
221
222#[cfg(test)]
223static TEST_RUNTIME_LOCK: OnceLock<Mutex<()>> = OnceLock::new();
224
225#[cfg(test)]
226thread_local! {
227    static TEST_RUNTIME_LOCK_DEPTH: std::cell::Cell<usize> = const { std::cell::Cell::new(0) };
228}
229
230#[cfg(test)]
231pub(crate) struct TestRuntimeLockGuard {
232    _guard: Option<std::sync::MutexGuard<'static, ()>>,
233}
234
235#[cfg(test)]
236pub(crate) fn acquire_test_runtime_lock() -> TestRuntimeLockGuard {
237    let guard = TEST_RUNTIME_LOCK_DEPTH.with(|depth| {
238        let current = depth.get();
239        depth.set(current + 1);
240        if current == 0 {
241            Some(
242                TEST_RUNTIME_LOCK
243                    .get_or_init(|| Mutex::new(()))
244                    .lock()
245                    .unwrap_or_else(|poisoned| poisoned.into_inner()),
246            )
247        } else {
248            None
249        }
250    });
251
252    TestRuntimeLockGuard { _guard: guard }
253}
254
255#[cfg(test)]
256impl Drop for TestRuntimeLockGuard {
257    fn drop(&mut self) {
258        TEST_RUNTIME_LOCK_DEPTH.with(|depth| {
259            let current = depth.get();
260            if current > 0 {
261                depth.set(current - 1);
262            }
263        });
264    }
265}
266
267#[cfg(test)]
268mod test_env_guard_tests {
269    use super::*;
270
271    #[test]
272    fn test_env_var_guard_restores_previous_value_and_supports_nested_reentry() {
273        let key = "PRODEX_TEST_ENV_GUARD_REENTRY";
274        let previous = env::var_os(key);
275
276        {
277            let _outer = TestEnvVarGuard::set(key, "outer");
278            assert_eq!(env::var(key).ok().as_deref(), Some("outer"));
279
280            {
281                let _inner = TestEnvVarGuard::set(key, "inner");
282                assert_eq!(env::var(key).ok().as_deref(), Some("inner"));
283            }
284
285            assert_eq!(env::var(key).ok().as_deref(), Some("outer"));
286        }
287
288        assert_eq!(env::var_os(key), previous);
289    }
290}
291
292struct RuntimeRotationProxy {
293    server: Arc<TinyServer>,
294    shutdown: Arc<AtomicBool>,
295    worker_threads: Vec<thread::JoinHandle<()>>,
296    accept_worker_count: usize,
297    listen_addr: std::net::SocketAddr,
298    log_path: PathBuf,
299    active_request_count: Arc<AtomicUsize>,
300    owner_lock: Option<StateFileLock>,
301}
302
303type RuntimeLocalWebSocket = WsSocket<Box<dyn TinyReadWrite + Send>>;
304type RuntimeUpstreamWebSocket = WsSocket<MaybeTlsStream<TcpStream>>;
305
306fn runtime_set_upstream_websocket_io_timeout(
307    socket: &mut RuntimeUpstreamWebSocket,
308    timeout: Option<Duration>,
309) -> io::Result<()> {
310    match socket.get_mut() {
311        MaybeTlsStream::Plain(stream) => {
312            stream.set_read_timeout(timeout)?;
313            stream.set_write_timeout(timeout)?;
314        }
315        MaybeTlsStream::Rustls(stream) => {
316            stream.sock.set_read_timeout(timeout)?;
317            stream.sock.set_write_timeout(timeout)?;
318        }
319        _ => {}
320    }
321    Ok(())
322}
323
324fn runtime_websocket_timeout_error(err: &WsError) -> bool {
325    matches!(
326        err,
327        WsError::Io(io_err)
328            if matches!(
329                io_err.kind(),
330                io::ErrorKind::TimedOut | io::ErrorKind::WouldBlock
331            )
332    )
333}
334
335fn runtime_proxy_log(shared: &RuntimeRotationProxyShared, message: impl AsRef<str>) {
336    runtime_proxy_log_to_path(&shared.log_path, message.as_ref());
337}
338
339fn runtime_proxy_next_request_id(shared: &RuntimeRotationProxyShared) -> u64 {
340    shared.request_sequence.fetch_add(1, Ordering::Relaxed)
341}
342
343pub fn main_entry() {
344    if let Err(err) = run() {
345        eprintln!("Error: {err:#}");
346        std::process::exit(1);
347    }
348}
349
350fn run() -> Result<()> {
351    let command = parse_cli_command_or_exit();
352    if command.should_show_update_notice() {
353        let _ = show_update_notice_if_available(&command);
354    }
355    ensure_runtime_policy_valid()?;
356    command.execute()
357}
358
359fn parse_cli_command_or_exit() -> Commands {
360    match parse_cli_command_from(env::args_os()) {
361        Ok(command) => command,
362        Err(err) => err.exit(),
363    }
364}
365
366fn parse_cli_command_from<I, T>(args: I) -> std::result::Result<Commands, clap::Error>
367where
368    I: IntoIterator<Item = T>,
369    T: Into<OsString>,
370{
371    let raw_args = args.into_iter().map(Into::into).collect::<Vec<_>>();
372    let parse_args = if should_default_cli_invocation_to_run(&raw_args) {
373        rewrite_cli_args_as_run(&raw_args)
374    } else {
375        raw_args
376    };
377    Ok(Cli::try_parse_from(parse_args)?.command)
378}
379
380fn should_default_cli_invocation_to_run(args: &[OsString]) -> bool {
381    let Some(first_arg) = args.get(1).and_then(|arg| arg.to_str()) else {
382        return true;
383    };
384
385    !matches!(
386        first_arg,
387        "-h" | "--help"
388            | "-V"
389            | "--version"
390            | "profile"
391            | "use"
392            | "current"
393            | "info"
394            | "doctor"
395            | "audit"
396            | "cleanup"
397            | "login"
398            | "logout"
399            | "quota"
400            | "run"
401            | "caveman"
402            | "super"
403            | "claude"
404            | "help"
405            | "__runtime-broker"
406    )
407}
408
409fn rewrite_cli_args_as_run(args: &[OsString]) -> Vec<OsString> {
410    let mut rewritten = Vec::with_capacity(args.len() + 1);
411    rewritten.push(
412        args.first()
413            .cloned()
414            .unwrap_or_else(|| OsString::from("prodex")),
415    );
416    rewritten.push(OsString::from("run"));
417    rewritten.extend(args.iter().skip(1).cloned());
418    rewritten
419}
420
421fn codex_bin() -> OsString {
422    env::var_os("PRODEX_CODEX_BIN").unwrap_or_else(|| OsString::from("codex"))
423}
424
425fn claude_bin() -> OsString {
426    env::var_os("PRODEX_CLAUDE_BIN").unwrap_or_else(|| OsString::from("claude"))
427}
428
429#[cfg(test)]
430#[path = "../tests/support/main_internal_harness.rs"]
431mod main_internal_tests;
432
433#[cfg(test)]
434#[path = "../tests/support/compat_replay_body.rs"]
435mod compat_replay_tests;