use crate::*;
static BENCH_CASE_SEQUENCE: AtomicU64 = AtomicU64::new(1);
fn bench_case_id() -> u64 {
BENCH_CASE_SEQUENCE.fetch_add(1, Ordering::Relaxed)
}
fn bench_paths(name: &str) -> AppPaths {
let root = std::env::temp_dir().join(format!(
"prodex-bench-{name}-{}-{}",
std::process::id(),
bench_case_id()
));
AppPaths {
state_file: root.join("state.json"),
managed_profiles_root: root.join("profiles"),
shared_codex_root: root.join("shared"),
legacy_shared_codex_root: root.join("legacy-shared"),
root,
}
}
fn bench_profile_entry(paths: &AppPaths, name: &str) -> ProfileEntry {
ProfileEntry {
codex_home: paths.managed_profiles_root.join(name),
managed: true,
email: None,
provider: ProfileProvider::Openai,
}
}
fn bench_usage(now: i64, primary_used_percent: i64, weekly_used_percent: i64) -> UsageResponse {
UsageResponse {
email: None,
plan_type: Some("bench".to_string()),
rate_limit: Some(WindowPair {
primary_window: Some(UsageWindow {
used_percent: Some(primary_used_percent),
reset_at: Some(now + 5 * 60 * 60),
limit_window_seconds: Some(5 * 60 * 60),
}),
secondary_window: Some(UsageWindow {
used_percent: Some(weekly_used_percent),
reset_at: Some(now + 7 * 24 * 60 * 60),
limit_window_seconds: Some(7 * 24 * 60 * 60),
}),
}),
code_review_rate_limit: None,
additional_rate_limits: Vec::new(),
}
}
fn bench_ready_usage(now: i64) -> UsageResponse {
bench_usage(now, 10, 20)
}
fn bench_quota_compatible_probe_entry(now: i64) -> RuntimeProfileProbeCacheEntry {
RuntimeProfileProbeCacheEntry {
checked_at: now,
auth: AuthSummary {
label: "chatgpt".to_string(),
quota_compatible: true,
},
result: Err("bench".to_string()),
}
}
fn bench_probe_entry(
now: i64,
primary_used_percent: i64,
weekly_used_percent: i64,
) -> RuntimeProfileProbeCacheEntry {
RuntimeProfileProbeCacheEntry {
checked_at: now,
auth: AuthSummary {
label: "chatgpt".to_string(),
quota_compatible: true,
},
result: Ok(bench_usage(now, primary_used_percent, weekly_used_percent)),
}
}
fn bench_ready_probe_entry(now: i64) -> RuntimeProfileProbeCacheEntry {
RuntimeProfileProbeCacheEntry {
checked_at: now,
auth: AuthSummary {
label: "chatgpt".to_string(),
quota_compatible: true,
},
result: Ok(bench_ready_usage(now)),
}
}
fn bench_runtime_shared(
name: &str,
state: RuntimeRotationState,
active_request_limit: usize,
) -> RuntimeRotationProxyShared {
RuntimeRotationProxyShared {
async_client: reqwest::Client::builder()
.build()
.expect("benchmark async client should build"),
async_runtime: Arc::new(
tokio::runtime::Builder::new_multi_thread()
.worker_threads(1)
.enable_all()
.build()
.expect("benchmark async runtime should build"),
),
runtime: Arc::new(Mutex::new(state)),
log_path: std::env::temp_dir().join(format!(
"prodex-bench-{name}-{}-{}.log",
std::process::id(),
bench_case_id()
)),
request_sequence: Arc::new(AtomicU64::new(1)),
state_save_revision: Arc::new(AtomicU64::new(0)),
local_overload_backoff_until: Arc::new(AtomicU64::new(0)),
active_request_count: Arc::new(AtomicUsize::new(0)),
active_request_limit,
lane_admission: RuntimeProxyLaneAdmission::new(runtime_proxy_lane_limits(
active_request_limit,
1,
1,
)),
}
}
#[doc(hidden)]
pub struct RuntimeProxyQuotaFallbackBenchCase {
shared: RuntimeRotationProxyShared,
excluded_profiles: BTreeSet<String>,
profile_name: String,
}
impl RuntimeProxyQuotaFallbackBenchCase {
pub fn new(profile_count: usize) -> Self {
let profile_count = profile_count.max(2);
let paths = bench_paths("quota-fallback");
let now = Local::now().timestamp();
let mut profiles = BTreeMap::new();
let mut probe_cache = BTreeMap::new();
let mut profile_inflight = BTreeMap::new();
for index in 0..profile_count {
let name = format!("profile-{index:03}");
profiles.insert(name.clone(), bench_profile_entry(&paths, &name));
probe_cache.insert(name.clone(), bench_quota_compatible_probe_entry(now));
if index > 0 && index + 1 < profile_count {
profile_inflight.insert(name, usize::MAX / 4);
}
}
let profile_name = "profile-000".to_string();
let state = RuntimeRotationState {
paths,
state: AppState {
active_profile: Some(profile_name.clone()),
profiles,
last_run_selected_at: BTreeMap::new(),
response_profile_bindings: BTreeMap::new(),
session_profile_bindings: BTreeMap::new(),
},
upstream_base_url: "https://chatgpt.com/backend-api".to_string(),
include_code_review: false,
current_profile: profile_name.clone(),
profile_usage_auth: BTreeMap::new(),
turn_state_bindings: BTreeMap::new(),
session_id_bindings: BTreeMap::new(),
continuation_statuses: RuntimeContinuationStatuses::default(),
profile_probe_cache: probe_cache,
profile_usage_snapshots: BTreeMap::new(),
profile_retry_backoff_until: BTreeMap::new(),
profile_transport_backoff_until: BTreeMap::new(),
profile_route_circuit_open_until: BTreeMap::new(),
profile_inflight,
profile_health: BTreeMap::new(),
};
Self {
shared: bench_runtime_shared("quota-fallback", state, 32),
excluded_profiles: BTreeSet::new(),
profile_name,
}
}
pub fn has_route_eligible_quota_fallback(&self) -> bool {
runtime_has_route_eligible_quota_fallback(
&self.shared,
&self.profile_name,
&self.excluded_profiles,
RuntimeRouteKind::Responses,
)
.expect("benchmark selection should succeed")
}
}
#[doc(hidden)]
pub struct RuntimeProxyPreviousResponseBenchCase {
shared: RuntimeRotationProxyShared,
excluded_profiles: BTreeSet<String>,
previous_response_id: String,
}
impl RuntimeProxyPreviousResponseBenchCase {
pub fn new(profile_count: usize) -> Self {
let profile_count = profile_count.max(16);
let paths = bench_paths("previous-response-selection");
let now = Local::now().timestamp();
let mut profiles = BTreeMap::new();
let mut probe_cache = BTreeMap::new();
let mut last_run_selected_at = BTreeMap::new();
let mut profile_health = BTreeMap::new();
let previous_response_id = "resp-bench".to_string();
for index in 0..profile_count {
let name = format!("profile-{index:03}");
profiles.insert(name.clone(), bench_profile_entry(&paths, &name));
probe_cache.insert(
name.clone(),
if index % 7 == 0 {
bench_probe_entry(now, 96, 96)
} else {
bench_ready_probe_entry(now)
},
);
last_run_selected_at.insert(name.clone(), now - index as i64);
if index < profile_count / 3 {
profile_health.insert(
runtime_previous_response_negative_cache_key(
&previous_response_id,
&name,
RuntimeRouteKind::Responses,
),
RuntimeProfileHealth {
score: 1,
updated_at: now,
},
);
}
}
let current_profile = "profile-000".to_string();
let state = RuntimeRotationState {
paths,
state: AppState {
active_profile: Some(current_profile.clone()),
profiles,
last_run_selected_at,
response_profile_bindings: BTreeMap::new(),
session_profile_bindings: BTreeMap::new(),
},
upstream_base_url: "https://chatgpt.com/backend-api".to_string(),
include_code_review: false,
current_profile,
profile_usage_auth: BTreeMap::new(),
turn_state_bindings: BTreeMap::new(),
session_id_bindings: BTreeMap::new(),
continuation_statuses: RuntimeContinuationStatuses::default(),
profile_probe_cache: probe_cache,
profile_usage_snapshots: BTreeMap::new(),
profile_retry_backoff_until: BTreeMap::new(),
profile_transport_backoff_until: BTreeMap::new(),
profile_route_circuit_open_until: BTreeMap::new(),
profile_inflight: BTreeMap::new(),
profile_health,
};
Self {
shared: bench_runtime_shared("previous-response-selection", state, 32),
excluded_profiles: BTreeSet::new(),
previous_response_id,
}
}
pub fn next_previous_response_candidate(&self) -> Option<String> {
next_runtime_previous_response_candidate(
&self.shared,
&self.excluded_profiles,
Some(&self.previous_response_id),
RuntimeRouteKind::Responses,
)
.expect("benchmark previous response selection should succeed")
}
}
#[doc(hidden)]
pub struct RuntimeProxyMixedPoolSelectionBenchCase {
shared: RuntimeRotationProxyShared,
excluded_profiles: BTreeSet<String>,
}
impl RuntimeProxyMixedPoolSelectionBenchCase {
pub fn new(profile_count: usize) -> Self {
let profile_count = profile_count.max(32);
let paths = bench_paths("mixed-pool-selection");
let now = Local::now().timestamp();
let mut profiles = BTreeMap::new();
let mut probe_cache = BTreeMap::new();
let mut last_run_selected_at = BTreeMap::new();
let mut retry_backoff_until = BTreeMap::new();
let mut transport_backoff_until = BTreeMap::new();
let mut route_circuit_open_until = BTreeMap::new();
let mut profile_inflight = BTreeMap::new();
let mut profile_health = BTreeMap::new();
let mut excluded_profiles = BTreeSet::new();
for index in 0..profile_count {
let name = format!("profile-{index:03}");
profiles.insert(name.clone(), bench_profile_entry(&paths, &name));
let (primary_used, weekly_used) = match index % 12 {
1 | 2 => (97, 97),
3 => (88, 94),
_ => (10 + (index % 20) as i64, 20 + (index % 15) as i64),
};
probe_cache.insert(
name.clone(),
bench_probe_entry(now, primary_used, weekly_used),
);
last_run_selected_at.insert(name.clone(), now - index as i64);
if index < profile_count / 8 {
excluded_profiles.insert(name.clone());
} else if index < profile_count / 3 {
match index % 5 {
0 => {
retry_backoff_until.insert(name.clone(), now + 90);
}
1 => {
transport_backoff_until.insert(
runtime_profile_transport_backoff_key(
&name,
RuntimeRouteKind::Responses,
),
now + 45,
);
}
2 => {
route_circuit_open_until.insert(
runtime_profile_route_circuit_key(&name, RuntimeRouteKind::Responses),
now + 30,
);
}
3 => {
profile_inflight.insert(name.clone(), usize::MAX / 4);
}
_ => {
profile_health.insert(
runtime_profile_route_health_key(&name, RuntimeRouteKind::Responses),
RuntimeProfileHealth {
score: 3,
updated_at: now,
},
);
}
}
} else if index % 17 == 0 {
profile_inflight.insert(name.clone(), 2);
}
}
let current_profile = "profile-000".to_string();
let state = RuntimeRotationState {
paths,
state: AppState {
active_profile: Some(current_profile.clone()),
profiles,
last_run_selected_at,
response_profile_bindings: BTreeMap::new(),
session_profile_bindings: BTreeMap::new(),
},
upstream_base_url: "https://chatgpt.com/backend-api".to_string(),
include_code_review: false,
current_profile,
profile_usage_auth: BTreeMap::new(),
turn_state_bindings: BTreeMap::new(),
session_id_bindings: BTreeMap::new(),
continuation_statuses: RuntimeContinuationStatuses::default(),
profile_probe_cache: probe_cache,
profile_usage_snapshots: BTreeMap::new(),
profile_retry_backoff_until: retry_backoff_until,
profile_transport_backoff_until: transport_backoff_until,
profile_route_circuit_open_until: route_circuit_open_until,
profile_inflight,
profile_health,
};
Self {
shared: bench_runtime_shared("mixed-pool-selection", state, 64),
excluded_profiles,
}
}
pub fn select_fresh_response_candidate(&self) -> Option<String> {
select_runtime_response_candidate_for_route(
&self.shared,
&self.excluded_profiles,
None,
None,
None,
None,
false,
None,
RuntimeRouteKind::Responses,
)
.expect("benchmark mixed-pool selection should succeed")
}
}
#[doc(hidden)]
pub struct RuntimeProxySseInspectBenchCase {
buffer: Vec<u8>,
}
impl RuntimeProxySseInspectBenchCase {
pub fn new(event_count: usize) -> Self {
let event_count = event_count.max(1);
let mut buffer = Vec::new();
for index in 0..event_count {
if index % 8 == 0 {
buffer.extend_from_slice(b": keep-alive\r\n");
}
let event_type = match index % 6 {
0 => "response.created",
1 => "response.in_progress",
2 => "response.output_item.added",
3 => "response.content_part.added",
4 => "response.output_text.delta",
_ => "response.reasoning_summary_text.delta",
};
buffer.extend_from_slice(
format!(
"event: {event_type}\r\ndata: {{\"type\":\"{event_type}\",\"response_id\":\"resp-{index:03}\",\"delta\":\"bench-token-{index:03}\"}}\r\n\r\n"
)
.as_bytes(),
);
}
buffer.extend_from_slice(
b"event: response.completed\r\ndata: {\"type\":\"response.completed\",\"response\":{\"id\":\"resp-tail\",\"turn_state\":\"turn-tail\"}}\r\n\r\n",
);
Self { buffer }
}
pub fn inspect(&self) -> usize {
match inspect_runtime_sse_buffer(&self.buffer).expect("benchmark SSE parse should succeed")
{
RuntimeSseInspectionProgress::Hold {
response_ids,
turn_state,
}
| RuntimeSseInspectionProgress::Commit {
response_ids,
turn_state,
} => response_ids.len() + usize::from(turn_state.is_some()),
RuntimeSseInspectionProgress::QuotaBlocked
| RuntimeSseInspectionProgress::PreviousResponseNotFound => 0,
}
}
}
#[doc(hidden)]
pub struct RuntimeProxyLineageCleanupBenchCase {
shared: RuntimeRotationProxyShared,
template: RuntimeRotationState,
profile_name: String,
response_ids: Vec<String>,
}
impl RuntimeProxyLineageCleanupBenchCase {
pub fn new(turn_state_count: usize) -> Self {
let turn_state_count = turn_state_count.max(2);
let paths = bench_paths("lineage-cleanup");
let now = Local::now().timestamp();
let profile_name = "main".to_string();
let target_response_id = "resp-target".to_string();
let mut response_profile_bindings = BTreeMap::new();
let mut turn_state_bindings = BTreeMap::new();
response_profile_bindings.insert(
target_response_id.clone(),
ResponseProfileBinding {
profile_name: profile_name.clone(),
bound_at: now,
},
);
for index in 0..turn_state_count {
let turn_state = format!("turn-{index:03}");
turn_state_bindings.insert(
turn_state.clone(),
ResponseProfileBinding {
profile_name: profile_name.clone(),
bound_at: now,
},
);
response_profile_bindings.insert(
runtime_response_turn_state_lineage_key(&target_response_id, &turn_state),
ResponseProfileBinding {
profile_name: profile_name.clone(),
bound_at: now,
},
);
if index % 2 == 0 {
let survivor_response_id = format!("resp-survivor-{index:03}");
response_profile_bindings.insert(
survivor_response_id.clone(),
ResponseProfileBinding {
profile_name: profile_name.clone(),
bound_at: now,
},
);
response_profile_bindings.insert(
runtime_response_turn_state_lineage_key(&survivor_response_id, &turn_state),
ResponseProfileBinding {
profile_name: profile_name.clone(),
bound_at: now,
},
);
}
}
let template = RuntimeRotationState {
paths,
state: AppState {
active_profile: Some(profile_name.clone()),
profiles: BTreeMap::from([(
profile_name.clone(),
ProfileEntry {
codex_home: PathBuf::from("/tmp/prodex-bench/main"),
managed: true,
email: None,
provider: ProfileProvider::Openai,
},
)]),
last_run_selected_at: BTreeMap::new(),
response_profile_bindings,
session_profile_bindings: BTreeMap::new(),
},
upstream_base_url: "https://chatgpt.com/backend-api".to_string(),
include_code_review: false,
current_profile: profile_name.clone(),
profile_usage_auth: BTreeMap::new(),
turn_state_bindings,
session_id_bindings: BTreeMap::new(),
continuation_statuses: RuntimeContinuationStatuses::default(),
profile_probe_cache: BTreeMap::new(),
profile_usage_snapshots: BTreeMap::new(),
profile_retry_backoff_until: BTreeMap::new(),
profile_transport_backoff_until: BTreeMap::new(),
profile_route_circuit_open_until: BTreeMap::new(),
profile_inflight: BTreeMap::new(),
profile_health: BTreeMap::new(),
};
Self {
shared: bench_runtime_shared("lineage-cleanup", template.clone(), 8),
template,
profile_name,
response_ids: vec![target_response_id],
}
}
pub fn clear_dead_response_bindings(&self) -> usize {
let mut runtime = self
.shared
.runtime
.lock()
.unwrap_or_else(|poisoned| poisoned.into_inner());
*runtime = self.template.clone();
drop(runtime);
clear_runtime_dead_response_bindings(
&self.shared,
&self.profile_name,
&self.response_ids,
"bench_cleanup",
)
.expect("benchmark lineage cleanup should succeed");
let runtime = self
.shared
.runtime
.lock()
.unwrap_or_else(|poisoned| poisoned.into_inner());
runtime.turn_state_bindings.len()
}
}