use super::*;
mod info;
mod runtime_launch;
mod selection;
mod shared;
pub(crate) use self::info::*;
pub(crate) use self::selection::*;
pub(crate) use self::shared::*;
pub(super) fn handle_info(_args: InfoArgs) -> Result<()> {
let paths = AppPaths::discover()?;
let state = AppState::load(&paths)?;
let policy_summary = runtime_policy_summary()?;
let runtime_metrics_targets = collect_runtime_broker_metrics_targets(&paths);
let now = Local::now().timestamp();
let version_summary = format_info_prodex_version(&paths)?;
let quota = collect_info_quota_aggregate(&paths, &state, now);
let processes = collect_prodex_processes();
let runtime_logs = collect_active_runtime_log_paths(&processes);
let runtime_load = collect_info_runtime_load_summary(&runtime_logs, now);
let runtime_process_count = processes.iter().filter(|process| process.runtime).count();
let five_hour_runway = estimate_info_runway(
&runtime_load.observations,
InfoQuotaWindow::FiveHour,
quota.five_hour_pool_remaining,
now,
);
let weekly_runway = estimate_info_runway(
&runtime_load.observations,
InfoQuotaWindow::Weekly,
quota.weekly_pool_remaining,
now,
);
let fields = vec![
("Profiles".to_string(), state.profiles.len().to_string()),
(
"Active profile".to_string(),
state.active_profile.as_deref().unwrap_or("-").to_string(),
),
(
"Runtime policy".to_string(),
format_runtime_policy_summary(policy_summary.as_ref()),
),
(
"Secret backend".to_string(),
format_secret_backend_summary(),
),
("Runtime logs".to_string(), format_runtime_logs_summary()),
("Audit logs".to_string(), format_audit_logs_summary()),
(
"Runtime metrics".to_string(),
format_runtime_broker_metrics_targets(&runtime_metrics_targets),
),
("Prodex version".to_string(), version_summary),
(
"Prodex processes".to_string(),
format_info_process_summary(&processes),
),
(
"Recent load".to_string(),
format_info_load_summary(&runtime_load, runtime_process_count),
),
(
"Quota data".to_string(),
format_info_quota_data_summary("a),
),
(
"5h remaining pool".to_string(),
format_info_pool_remaining(
quota.five_hour_pool_remaining,
quota.profiles_with_data(),
quota.earliest_five_hour_reset_at,
),
),
(
"Weekly remaining pool".to_string(),
format_info_pool_remaining(
quota.weekly_pool_remaining,
quota.profiles_with_data(),
quota.earliest_weekly_reset_at,
),
),
(
"5h runway".to_string(),
format_info_runway(
quota.profiles_with_data(),
quota.five_hour_pool_remaining,
quota.earliest_five_hour_reset_at,
five_hour_runway.as_ref(),
now,
),
),
(
"Weekly runway".to_string(),
format_info_runway(
quota.profiles_with_data(),
quota.weekly_pool_remaining,
quota.earliest_weekly_reset_at,
weekly_runway.as_ref(),
now,
),
),
];
print_panel("Info", &fields);
Ok(())
}
pub(super) fn handle_doctor(args: DoctorArgs) -> Result<()> {
let paths = AppPaths::discover()?;
let state = AppState::load(&paths)?;
let codex_home = default_codex_home(&paths)?;
let policy_summary = runtime_policy_summary()?;
let runtime_metrics_targets = collect_runtime_broker_metrics_targets(&paths);
if args.runtime && args.json {
let summary = collect_runtime_doctor_summary();
let mut value = runtime_doctor_json_value(&summary);
if let Some(object) = value.as_object_mut() {
object.insert(
"runtime_policy".to_string(),
runtime_policy_json_value(policy_summary.as_ref()),
);
object.insert("secret_backend".to_string(), secret_backend_json_value());
object.insert("runtime_logs".to_string(), runtime_logs_json_value());
object.insert("audit_logs".to_string(), audit_logs_json_value());
object.insert(
"live_brokers".to_string(),
serde_json::to_value(collect_live_runtime_broker_observations(&paths))
.unwrap_or_else(|_| serde_json::Value::Array(Vec::new())),
);
object.insert(
"live_broker_metrics_targets".to_string(),
serde_json::to_value(&runtime_metrics_targets)
.unwrap_or_else(|_| serde_json::Value::Array(Vec::new())),
);
}
let json = serde_json::to_string_pretty(&value)
.context("failed to serialize runtime doctor summary")?;
println!("{json}");
return Ok(());
}
let summary_fields = vec![
("Prodex root".to_string(), paths.root.display().to_string()),
(
"State file".to_string(),
format!(
"{} ({})",
paths.state_file.display(),
if paths.state_file.exists() {
"exists"
} else {
"missing"
}
),
),
(
"Profiles root".to_string(),
paths.managed_profiles_root.display().to_string(),
),
(
"Default CODEX_HOME".to_string(),
format!(
"{} ({})",
codex_home.display(),
if codex_home.exists() {
"exists"
} else {
"missing"
}
),
),
(
"Codex binary".to_string(),
format_binary_resolution(&codex_bin()),
),
(
"Quota endpoint".to_string(),
usage_url("a_base_url(None)),
),
(
"Runtime policy".to_string(),
format_runtime_policy_summary(policy_summary.as_ref()),
),
(
"Secret backend".to_string(),
format_secret_backend_summary(),
),
("Runtime logs".to_string(), format_runtime_logs_summary()),
("Audit logs".to_string(), format_audit_logs_summary()),
(
"Runtime metrics".to_string(),
format_runtime_broker_metrics_targets(&runtime_metrics_targets),
),
("Profiles".to_string(), state.profiles.len().to_string()),
(
"Active profile".to_string(),
state.active_profile.as_deref().unwrap_or("-").to_string(),
),
];
print_panel("Doctor", &summary_fields);
if args.runtime {
println!();
print_panel("Runtime Proxy", &runtime_doctor_fields());
}
if state.profiles.is_empty() {
return Ok(());
}
for report in collect_doctor_profile_reports(&state, args.quota) {
let kind = if report.summary.managed {
"managed"
} else {
"external"
};
println!();
let mut fields = vec![
(
"Current".to_string(),
if report.summary.active {
"Yes".to_string()
} else {
"No".to_string()
},
),
("Kind".to_string(), kind.to_string()),
(
"Provider".to_string(),
report.summary.provider.display_name().to_string(),
),
("Auth".to_string(), report.summary.auth.label),
(
"Identity".to_string(),
report.summary.email.as_deref().unwrap_or("-").to_string(),
),
(
"Path".to_string(),
report.summary.codex_home.display().to_string(),
),
(
"Exists".to_string(),
if report.summary.codex_home.exists() {
"Yes".to_string()
} else {
"No".to_string()
},
),
];
if let Some(quota) = report.quota {
match quota {
Ok(usage) => {
let blocked = collect_blocked_limits(&usage, false);
fields.push((
"Quota".to_string(),
if blocked.is_empty() {
"Ready".to_string()
} else {
format!("Blocked ({})", format_blocked_limits(&blocked))
},
));
fields.push(("Main".to_string(), format_main_windows(&usage)));
}
Err(err) => {
fields.push((
"Quota".to_string(),
format!("Error ({})", first_line_of_error(&err.to_string())),
));
}
}
}
print_panel(&format!("Profile {}", report.summary.name), &fields);
}
Ok(())
}
pub(super) fn handle_audit(args: AuditArgs) -> Result<()> {
let query = AuditLogQuery {
tail: args.tail,
component: args.component.as_deref().map(str::trim).map(str::to_string),
action: args.action.as_deref().map(str::trim).map(str::to_string),
outcome: args.outcome.as_deref().map(str::trim).map(str::to_string),
};
let events = read_recent_audit_events(&query)?;
if args.json {
let json = serde_json::to_string_pretty(&serde_json::json!({
"audit_logs": audit_logs_json_value(),
"filters": {
"tail": query.tail,
"component": query.component,
"action": query.action,
"outcome": query.outcome,
},
"events": events,
}))
.context("failed to serialize audit log output")?;
println!("{json}");
return Ok(());
}
println!(
"{}",
render_audit_events_human(&audit_log_path(), &query, &events)
);
Ok(())
}
pub(super) fn handle_cleanup() -> Result<()> {
let paths = AppPaths::discover()?;
let mut state = AppState::load(&paths)?;
let runtime_log_dir = runtime_proxy_log_dir();
let summary = perform_prodex_cleanup(&paths, &mut state)?;
let fields = vec![
("Prodex root".to_string(), paths.root.display().to_string()),
(
"Duplicate profiles".to_string(),
summary.duplicate_profiles_removed.to_string(),
),
(
"Duplicate managed homes".to_string(),
summary.duplicate_managed_profile_homes_removed.to_string(),
),
(
"Runtime logs".to_string(),
format!(
"{} removed from {}",
summary.runtime_logs_removed,
runtime_log_dir.display()
),
),
(
"Runtime pointer".to_string(),
if summary.stale_runtime_log_pointer_removed > 0 {
"removed stale latest-pointer file".to_string()
} else {
"clean".to_string()
},
),
(
"Temp login homes".to_string(),
summary.stale_login_dirs_removed.to_string(),
),
(
"Orphan managed homes".to_string(),
summary.orphan_managed_profile_dirs_removed.to_string(),
),
(
"Transient root files".to_string(),
summary.transient_root_files_removed.to_string(),
),
(
"Stale root temp files".to_string(),
summary.stale_root_temp_files_removed.to_string(),
),
(
"Dead broker leases".to_string(),
summary.dead_runtime_broker_leases_removed.to_string(),
),
(
"Dead broker registries".to_string(),
summary.dead_runtime_broker_registries_removed.to_string(),
),
(
"Total removed".to_string(),
summary.total_removed().to_string(),
),
];
print_panel("Cleanup", &fields);
Ok(())
}
pub(super) fn deserialize_null_default<'de, D, T>(
deserializer: D,
) -> std::result::Result<T, D::Error>
where
D: serde::Deserializer<'de>,
T: serde::Deserialize<'de> + Default,
{
Ok(Option::<T>::deserialize(deserializer)?.unwrap_or_default())
}
pub(super) fn handle_quota(args: QuotaArgs) -> Result<()> {
let paths = AppPaths::discover()?;
let state = AppState::load(&paths)?;
if args.all {
if state.profiles.is_empty() {
bail!("no profiles configured");
}
if quota_watch_enabled(&args) {
return watch_all_quotas(&paths, args.base_url.as_deref(), args.detail);
}
let reports = collect_quota_reports(&state, args.base_url.as_deref());
print_quota_reports(&reports, args.detail);
return Ok(());
}
let profile_name = resolve_profile_name(&state, args.profile.as_deref())?;
let profile = state
.profiles
.get(&profile_name)
.with_context(|| format!("profile '{}' is missing", profile_name))?;
if !profile.provider.supports_codex_runtime() {
bail!(
"profile '{}' uses {}. `prodex quota` currently supports OpenAI/Codex profiles only.",
profile_name,
profile.provider.display_name()
);
}
let codex_home = profile.codex_home.clone();
if args.raw {
let usage = fetch_usage_json(&codex_home, args.base_url.as_deref())?;
println!(
"{}",
serde_json::to_string_pretty(&usage).context("failed to render usage JSON")?
);
return Ok(());
}
if quota_watch_enabled(&args) {
return watch_quota(&profile_name, &codex_home, args.base_url.as_deref());
}
let usage = fetch_usage(&codex_home, args.base_url.as_deref())?;
println!("{}", render_profile_quota(&profile_name, &usage));
Ok(())
}
pub(super) fn handle_run(args: RunArgs) -> Result<()> {
runtime_launch::handle_run(args)
}
pub(super) fn prepare_runtime_launch(
request: RuntimeLaunchRequest<'_>,
) -> Result<PreparedRuntimeLaunch> {
runtime_launch::prepare_runtime_launch(request)
}
#[cfg_attr(not(test), allow(dead_code))]
pub(crate) fn resolve_runtime_launch_profile_name(
state: &AppState,
requested: Option<&str>,
) -> Result<String> {
runtime_launch::resolve_runtime_launch_profile_name(state, requested)
}
pub(super) fn handle_runtime_broker(args: RuntimeBrokerArgs) -> Result<()> {
let paths = AppPaths::discover()?;
let state = AppState::load(&paths)?;
let proxy = start_runtime_rotation_proxy_with_listen_addr(
&paths,
&state,
&args.current_profile,
args.upstream_base_url.clone(),
args.include_code_review,
args.listen_addr.as_deref(),
)?;
if proxy.owner_lock.is_none() {
return Ok(());
}
let metadata = RuntimeBrokerMetadata {
broker_key: runtime_broker_key(&args.upstream_base_url, args.include_code_review),
listen_addr: proxy.listen_addr.to_string(),
started_at: Local::now().timestamp(),
current_profile: args.current_profile.clone(),
include_code_review: args.include_code_review,
instance_token: args.instance_token.clone(),
admin_token: args.admin_token.clone(),
};
register_runtime_broker_metadata(&proxy.log_path, metadata.clone());
let registry = RuntimeBrokerRegistry {
pid: std::process::id(),
listen_addr: proxy.listen_addr.to_string(),
started_at: metadata.started_at,
upstream_base_url: args.upstream_base_url.clone(),
include_code_review: args.include_code_review,
current_profile: args.current_profile.clone(),
instance_token: args.instance_token.clone(),
admin_token: args.admin_token.clone(),
openai_mount_path: Some(RUNTIME_PROXY_OPENAI_MOUNT_PATH.to_string()),
};
save_runtime_broker_registry(&paths, &args.broker_key, ®istry)?;
runtime_proxy_log_to_path(
&proxy.log_path,
&format!(
"runtime_broker_started listen_addr={} broker_key={} current_profile={} include_code_review={}",
proxy.listen_addr, args.broker_key, args.current_profile, args.include_code_review
),
);
audit_log_event_best_effort(
"runtime_broker",
"start",
"success",
serde_json::json!({
"broker_key": args.broker_key,
"listen_addr": proxy.listen_addr.to_string(),
"current_profile": args.current_profile,
"include_code_review": args.include_code_review,
"upstream_base_url": args.upstream_base_url,
}),
);
let startup_grace_until = metadata
.started_at
.saturating_add(runtime_broker_startup_grace_seconds());
let poll_interval = Duration::from_millis(RUNTIME_BROKER_POLL_INTERVAL_MS);
let lease_scan_interval = Duration::from_millis(
RUNTIME_BROKER_LEASE_SCAN_INTERVAL_MS.max(RUNTIME_BROKER_POLL_INTERVAL_MS),
);
let mut idle_started_at = None::<i64>;
let mut cached_live_leases = 0usize;
let mut last_lease_scan_at = Instant::now() - lease_scan_interval;
loop {
let active_requests = proxy.active_request_count.load(Ordering::SeqCst);
if active_requests == 0 && last_lease_scan_at.elapsed() >= lease_scan_interval {
cached_live_leases = cleanup_runtime_broker_stale_leases(&paths, &args.broker_key);
last_lease_scan_at = Instant::now();
}
if cached_live_leases > 0 || active_requests > 0 {
idle_started_at = None;
} else {
let now = Local::now().timestamp();
if now < startup_grace_until {
idle_started_at = None;
thread::sleep(poll_interval);
continue;
}
let idle_since = idle_started_at.get_or_insert(now);
if now.saturating_sub(*idle_since) >= RUNTIME_BROKER_IDLE_GRACE_SECONDS {
runtime_proxy_log_to_path(
&proxy.log_path,
&format!(
"runtime_broker_idle_shutdown broker_key={} idle_seconds={}",
args.broker_key,
now.saturating_sub(*idle_since)
),
);
break;
}
}
thread::sleep(poll_interval);
}
drop(proxy);
remove_runtime_broker_registry_if_token_matches(&paths, &args.broker_key, &args.instance_token);
Ok(())
}
pub(super) fn run_child_plan(
plan: &ChildProcessPlan,
runtime_proxy: Option<&RuntimeProxyEndpoint>,
) -> Result<ExitStatus> {
let mut command = Command::new(&plan.binary);
command.args(&plan.args).env("CODEX_HOME", &plan.codex_home);
for key in &plan.removed_env {
command.env_remove(key);
}
for (key, value) in &plan.extra_env {
command.env(key, value);
}
let mut child = command
.spawn()
.with_context(|| format!("failed to execute {}", plan.binary.to_string_lossy()))?;
let _child_runtime_broker_lease = match runtime_proxy {
Some(proxy) => match proxy.create_child_lease(child.id()) {
Ok(lease) => Some(lease),
Err(err) => {
let _ = child.kill();
let _ = child.wait();
return Err(err);
}
},
None => None,
};
let status = child
.wait()
.with_context(|| format!("failed to wait for {}", plan.binary.to_string_lossy()))?;
Ok(status)
}
pub(super) fn exit_with_status(status: ExitStatus) -> Result<()> {
std::process::exit(status.code().unwrap_or(1));
}
pub(super) fn prepare_codex_launch_args(codex_args: &[OsString]) -> (Vec<OsString>, bool) {
let codex_args = normalize_run_codex_args(codex_args);
let include_code_review = is_review_invocation(&codex_args);
(codex_args, include_code_review)
}
pub(super) fn is_review_invocation(args: &[OsString]) -> bool {
args.iter().any(|arg| arg == "review")
}