use super::*;
mod config_failover;
mod response_semantics;
fn retry_layer_config(
max_attempts: u32,
on_status: &str,
on_class: Vec<String>,
strategy: RetryStrategy,
) -> crate::config::RetryLayerConfig {
crate::config::RetryLayerConfig {
max_attempts: Some(max_attempts),
backoff_ms: Some(0),
backoff_max_ms: Some(0),
jitter_ms: Some(0),
on_status: Some(on_status.to_string()),
on_class: Some(on_class),
strategy: Some(strategy),
}
}
fn retry_config(
max_attempts: u32,
on_status: &str,
on_class: Vec<String>,
strategy: RetryStrategy,
) -> RetryConfig {
retry_config_with_cooldowns(max_attempts, on_status, on_class, strategy, 0, 0, 0)
}
fn retry_config_with_cooldowns(
max_attempts: u32,
on_status: &str,
on_class: Vec<String>,
strategy: RetryStrategy,
cloudflare_challenge_cooldown_secs: u64,
cloudflare_timeout_cooldown_secs: u64,
transport_cooldown_secs: u64,
) -> RetryConfig {
RetryConfig {
upstream: Some(retry_layer_config(
max_attempts,
on_status,
on_class,
strategy,
)),
cloudflare_challenge_cooldown_secs: Some(cloudflare_challenge_cooldown_secs),
cloudflare_timeout_cooldown_secs: Some(cloudflare_timeout_cooldown_secs),
transport_cooldown_secs: Some(transport_cooldown_secs),
cooldown_backoff_factor: Some(1),
cooldown_backoff_max_secs: Some(0),
..Default::default()
}
}
#[tokio::test]
async fn proxy_failover_retries_502_then_uses_second_upstream() {
run_failover_retries_502_then_uses_second_upstream().await;
}
#[tokio::test]
async fn production_request_path_uses_route_plan_executor() {
let before = super::super::provider_execution::route_executor_request_path_test_invocations();
run_failover_retries_502_then_uses_second_upstream().await;
let after = super::super::provider_execution::route_executor_request_path_test_invocations();
assert!(after > before);
}
async fn run_failover_retries_502_then_uses_second_upstream() {
let upstream1_hits = Arc::new(AtomicUsize::new(0));
let upstream2_hits = Arc::new(AtomicUsize::new(0));
let u1_hits = upstream1_hits.clone();
let upstream1 = axum::Router::new().route(
"/v1/responses",
post(move || async move {
u1_hits.fetch_add(1, Ordering::SeqCst);
(
StatusCode::BAD_GATEWAY,
Json(serde_json::json!({ "err": "nope" })),
)
}),
);
let (u1_addr, u1_handle) = spawn_axum_server(upstream1);
let u2_hits = upstream2_hits.clone();
let upstream2 = axum::Router::new().route(
"/v1/responses",
post(move || async move {
u2_hits.fetch_add(1, Ordering::SeqCst);
(
StatusCode::OK,
Json(serde_json::json!({ "ok": true, "upstream": 2 })),
)
}),
);
let (u2_addr, u2_handle) = spawn_axum_server(upstream2);
let proxy_client = Client::new();
let retry = retry_config(2, "502", Vec::new(), RetryStrategy::Failover);
let cfg = make_proxy_config(
vec![
UpstreamConfig {
base_url: format!("http://{}/v1", u1_addr),
auth: UpstreamAuth {
auth_token: None,
auth_token_env: None,
api_key: None,
api_key_env: None,
},
tags: {
let mut t = HashMap::new();
t.insert("provider_id".to_string(), "u1".to_string());
t
},
supported_models: HashMap::new(),
model_mapping: HashMap::new(),
},
UpstreamConfig {
base_url: format!("http://{}/v1", u2_addr),
auth: UpstreamAuth {
auth_token: None,
auth_token_env: None,
api_key: None,
api_key_env: None,
},
tags: {
let mut t = HashMap::new();
t.insert("provider_id".to_string(), "u2".to_string());
t
},
supported_models: HashMap::new(),
model_mapping: HashMap::new(),
},
],
retry,
);
let proxy = ProxyService::new(
proxy_client,
Arc::new(cfg),
"codex",
Arc::new(std::sync::Mutex::new(HashMap::new())),
);
let state = proxy.state.clone();
state
.set_global_station_override("stale-station-pin".to_string(), 1)
.await;
state
.set_session_station_override(
"sid-input".to_string(),
"stale-session-station-pin".to_string(),
1,
)
.await;
let app = crate::proxy::router(proxy);
let (proxy_addr, proxy_handle) = spawn_axum_server(app);
let client = reqwest::Client::new();
let req = client
.post(format!("http://{}/v1/responses", proxy_addr))
.header("content-type", "application/json")
.body(r#"{"model":"gpt","input":"hi"}"#);
let resp = req.send().await.expect("send");
assert_eq!(resp.status(), StatusCode::OK);
let body = resp.text().await.expect("text");
assert!(
body.contains(r#""upstream":2"#),
"expected response from upstream2, got: {body}"
);
let finished = state.list_recent_finished(10).await;
let request = finished
.iter()
.find(|request| request.status_code == StatusCode::OK)
.expect("finished request");
let retry = request.retry.as_ref().expect("retry info");
assert_eq!(retry.route_attempts.len(), 3);
assert_eq!(
retry
.route_attempts
.iter()
.map(|attempt| attempt.endpoint_id.as_deref())
.collect::<Vec<_>>(),
vec![Some("0"), Some("0"), Some("1")]
);
assert_eq!(
retry
.route_attempts
.iter()
.map(|attempt| attempt.provider_endpoint_key.as_deref())
.collect::<Vec<_>>(),
vec![Some("codex/u1/0"), Some("codex/u1/0"), Some("codex/u2/1")]
);
assert_eq!(
retry
.route_attempts
.iter()
.map(|attempt| attempt.preference_group)
.collect::<Vec<_>>(),
vec![Some(0), Some(0), Some(0)]
);
assert_eq!(
retry
.route_attempts
.iter()
.map(|attempt| attempt.route_path.clone())
.collect::<Vec<_>>(),
vec![
vec!["legacy".to_string(), "test".to_string(), "u1".to_string()],
vec!["legacy".to_string(), "test".to_string(), "u1".to_string()],
vec!["legacy".to_string(), "test".to_string(), "u2".to_string()],
]
);
let route_decision = request
.route_decision
.as_ref()
.expect("finished route decision");
assert_eq!(route_decision.endpoint_id.as_deref(), Some("1"));
assert_eq!(route_decision.route_path, vec!["legacy", "test", "u2"]);
assert_eq!(upstream1_hits.load(Ordering::SeqCst), 2);
assert_eq!(upstream2_hits.load(Ordering::SeqCst), 1);
proxy_handle.abort();
u1_handle.abort();
u2_handle.abort();
}
#[tokio::test]
async fn proxy_v4_route_graph_affinity_is_session_scoped() {
let input_hits = Arc::new(AtomicUsize::new(0));
let input1_hits = Arc::new(AtomicUsize::new(0));
let right_hits = Arc::new(AtomicUsize::new(0));
let input_counter = input_hits.clone();
let input = axum::Router::new().route(
"/v1/responses",
post(move || {
let input_counter = input_counter.clone();
async move {
let hit = input_counter.fetch_add(1, Ordering::SeqCst) + 1;
if hit == 2 {
(
StatusCode::BAD_GATEWAY,
Json(serde_json::json!({ "provider": "input", "err": "quota" })),
)
} else {
(
StatusCode::OK,
Json(serde_json::json!({ "provider": "input" })),
)
}
}
}),
);
let (input_addr, input_handle) = spawn_axum_server(input);
let input1_counter = input1_hits.clone();
let input1 = axum::Router::new().route(
"/v1/responses",
post(move || {
let input1_counter = input1_counter.clone();
async move {
let hit = input1_counter.fetch_add(1, Ordering::SeqCst) + 1;
if hit == 1 {
(
StatusCode::BAD_GATEWAY,
Json(serde_json::json!({ "provider": "input1", "err": "quota" })),
)
} else {
(
StatusCode::OK,
Json(serde_json::json!({ "provider": "input1" })),
)
}
}
}),
);
let (input1_addr, input1_handle) = spawn_axum_server(input1);
let right_counter = right_hits.clone();
let right = axum::Router::new().route(
"/v1/responses",
post(move || {
let right_counter = right_counter.clone();
async move {
right_counter.fetch_add(1, Ordering::SeqCst);
(
StatusCode::OK,
Json(serde_json::json!({ "provider": "right" })),
)
}
}),
);
let (right_addr, right_handle) = spawn_axum_server(right);
let retry = RetryConfig {
profile: Some(RetryProfileName::AggressiveFailover),
upstream: Some(retry_layer_config(
1,
"502",
Vec::new(),
RetryStrategy::Failover,
)),
provider: Some(retry_layer_config(
4,
"502",
Vec::new(),
RetryStrategy::Failover,
)),
allow_cross_station_before_first_output: Some(true),
transport_cooldown_secs: Some(0),
cooldown_backoff_factor: Some(1),
cooldown_backoff_max_secs: Some(0),
..RetryConfig::default()
};
let monthly_tags =
std::collections::BTreeMap::from([("billing".to_string(), "monthly".to_string())]);
let v4 = ProxyConfigV4 {
retry,
codex: ServiceViewV4 {
providers: std::collections::BTreeMap::from([
(
"input".to_string(),
ProviderConfigV4 {
base_url: Some(format!("http://{input_addr}/v1")),
inline_auth: UpstreamAuth::default(),
tags: monthly_tags.clone(),
..ProviderConfigV4::default()
},
),
(
"input1".to_string(),
ProviderConfigV4 {
base_url: Some(format!("http://{input1_addr}/v1")),
inline_auth: UpstreamAuth::default(),
tags: monthly_tags,
..ProviderConfigV4::default()
},
),
(
"right".to_string(),
ProviderConfigV4 {
base_url: Some(format!("http://{right_addr}/v1")),
inline_auth: UpstreamAuth::default(),
..ProviderConfigV4::default()
},
),
]),
routing: Some(RoutingConfigV4 {
entry: "monthly_first".to_string(),
routes: std::collections::BTreeMap::from([
(
"monthly_first".to_string(),
RoutingNodeV4 {
strategy: RoutingPolicyV4::TagPreferred,
children: vec!["monthly_pool".to_string(), "right".to_string()],
prefer_tags: vec![std::collections::BTreeMap::from([(
"billing".to_string(),
"monthly".to_string(),
)])],
on_exhausted: crate::config::RoutingExhaustedActionV4::Continue,
..RoutingNodeV4::default()
},
),
(
"monthly_pool".to_string(),
RoutingNodeV4 {
strategy: RoutingPolicyV4::OrderedFailover,
children: vec!["input".to_string(), "input1".to_string()],
on_exhausted: crate::config::RoutingExhaustedActionV4::Continue,
..RoutingNodeV4::default()
},
),
]),
..RoutingConfigV4::default()
}),
..ServiceViewV4::default()
},
..ProxyConfigV4::default()
};
let runtime = crate::config::compile_v4_to_runtime(&v4).expect("compat runtime");
let proxy = ProxyService::new_with_v4_source(
Client::new(),
Arc::new(runtime),
Some(Arc::new(v4)),
"codex",
Arc::new(std::sync::Mutex::new(HashMap::new())),
);
let state = proxy.state.clone();
let app = crate::proxy::router(proxy);
let (proxy_addr, proxy_handle) = spawn_axum_server(app);
let client = reqwest::Client::new();
let first = send_responses_json(&client, proxy_addr, Some("sid-input")).await;
assert_eq!(first["provider"].as_str(), Some("input"));
let fallback = send_responses_json(&client, proxy_addr, Some("sid-right")).await;
assert_eq!(fallback["provider"].as_str(), Some("right"));
assert_eq!(input_hits.load(Ordering::SeqCst), 2);
assert_eq!(input1_hits.load(Ordering::SeqCst), 1);
assert_eq!(right_hits.load(Ordering::SeqCst), 1);
let finished_after_fallback = state.list_recent_finished(10).await;
let fallback_request = finished_after_fallback
.iter()
.find(|request| {
request.session_id.as_deref() == Some("sid-right")
&& request.provider_id.as_deref() == Some("right")
})
.expect("fallback finished request");
assert_eq!(fallback_request.station_name, None);
assert_eq!(
fallback_request
.route_decision
.as_ref()
.and_then(|decision| {
decision
.effective_station
.as_ref()
.map(|station| station.value.as_str())
}),
None
);
assert_eq!(
fallback_request
.route_decision
.as_ref()
.and_then(|decision| decision.endpoint_id.as_deref()),
Some("default")
);
let fallback_retry = fallback_request
.retry
.as_ref()
.expect("fallback request retry trace");
assert_eq!(
fallback_retry
.route_attempts
.iter()
.map(|attempt| attempt.provider_id.as_deref())
.collect::<Vec<_>>(),
vec![Some("input"), Some("input1"), Some("right")]
);
assert_eq!(
fallback_retry
.route_attempts
.iter()
.map(|attempt| attempt.provider_endpoint_key.as_deref())
.collect::<Vec<_>>(),
vec![
Some("codex/input/default"),
Some("codex/input1/default"),
Some("codex/right/default"),
]
);
assert_eq!(
fallback_retry
.route_attempts
.iter()
.map(|attempt| (attempt.provider_max_attempts, attempt.upstream_max_attempts))
.collect::<Vec<_>>(),
vec![(Some(4), Some(1)), (Some(4), Some(1)), (Some(4), Some(1))]
);
assert_eq!(
fallback_retry
.route_attempts
.iter()
.map(|attempt| attempt.avoided_candidate_indices.clone())
.collect::<Vec<_>>(),
vec![Vec::<usize>::new(), vec![0], vec![0, 1]]
);
assert!(
fallback_retry
.route_attempts
.iter()
.all(|attempt| attempt.avoid_for_station.is_empty()),
"route graph attempts should track candidate avoids, not station upstream avoids"
);
assert!(
fallback_retry
.route_attempts
.iter()
.all(|attempt| attempt.station_name.is_none() && attempt.upstream_index.is_none()),
"route graph attempts should not serialize compatibility station/index as primary identity"
);
let fallback_affinity_snapshot = state
.get_session_route_affinity("sid-right")
.await
.expect("right affinity after fallback");
assert_eq!(
fallback_affinity_snapshot
.provider_endpoint
.provider_id
.as_str(),
"right"
);
assert_eq!(
fallback_affinity_snapshot.change_reason.as_str(),
"failover_after_status_502"
);
let preferred_after_fallback =
send_responses_json(&client, proxy_addr, Some("sid-right")).await;
assert_eq!(preferred_after_fallback["provider"].as_str(), Some("input"));
let sticky = send_responses_json(&client, proxy_addr, Some("sid-input")).await;
assert_eq!(sticky["provider"].as_str(), Some("input"));
let new_session = send_responses_json(&client, proxy_addr, Some("sid-new")).await;
assert_eq!(new_session["provider"].as_str(), Some("input"));
assert_eq!(input_hits.load(Ordering::SeqCst), 5);
assert_eq!(input1_hits.load(Ordering::SeqCst), 1);
assert_eq!(right_hits.load(Ordering::SeqCst), 1);
let affinities = state.list_session_route_affinities().await;
assert_eq!(
affinities
.get("sid-input")
.map(|affinity| affinity.provider_endpoint.provider_id.as_str()),
Some("input")
);
let fallback_affinity = affinities.get("sid-right").expect("right affinity");
assert_eq!(
fallback_affinity.provider_endpoint.provider_id.as_str(),
"input"
);
assert_eq!(
fallback_affinity.provider_endpoint.stable_key(),
"codex/input/default"
);
assert_eq!(fallback_affinity.change_reason.as_str(), "target_changed");
let cards = state.list_session_identity_cards(20).await;
let right_card = cards
.iter()
.find(|card| card.session_id.as_deref() == Some("sid-right"))
.expect("right card");
assert_eq!(
right_card
.route_affinity
.as_ref()
.map(|affinity| affinity.provider_endpoint.provider_id.as_str()),
Some("input")
);
proxy_handle.abort();
input_handle.abort();
input1_handle.abort();
right_handle.abort();
}
#[tokio::test]
async fn proxy_v4_route_graph_health_does_not_write_synthetic_routing_lb_state() {
let primary_hits = Arc::new(AtomicUsize::new(0));
let backup_hits = Arc::new(AtomicUsize::new(0));
let primary_counter = primary_hits.clone();
let primary = axum::Router::new().route(
"/v1/responses",
post(move || {
let primary_counter = primary_counter.clone();
async move {
let hit = primary_counter.fetch_add(1, Ordering::SeqCst) + 1;
if hit == 1 {
(
StatusCode::BAD_GATEWAY,
Json(serde_json::json!({ "provider": "primary", "err": "first" })),
)
} else {
(
StatusCode::OK,
Json(serde_json::json!({ "provider": "primary" })),
)
}
}
}),
);
let (primary_addr, primary_handle) = spawn_axum_server(primary);
let backup_counter = backup_hits.clone();
let backup = axum::Router::new().route(
"/v1/responses",
post(move || {
let backup_counter = backup_counter.clone();
async move {
backup_counter.fetch_add(1, Ordering::SeqCst);
(
StatusCode::OK,
Json(serde_json::json!({ "provider": "backup" })),
)
}
}),
);
let (backup_addr, backup_handle) = spawn_axum_server(backup);
let retry = RetryConfig {
upstream: Some(retry_layer_config(
1,
"502",
Vec::new(),
RetryStrategy::Failover,
)),
provider: Some(retry_layer_config(
2,
"502",
Vec::new(),
RetryStrategy::Failover,
)),
allow_cross_station_before_first_output: Some(true),
transport_cooldown_secs: Some(0),
cooldown_backoff_factor: Some(1),
cooldown_backoff_max_secs: Some(0),
..RetryConfig::default()
};
let v4 = ProxyConfigV4 {
retry,
codex: ServiceViewV4 {
providers: std::collections::BTreeMap::from([
(
"primary".to_string(),
ProviderConfigV4 {
base_url: Some(format!("http://{primary_addr}/v1")),
inline_auth: UpstreamAuth::default(),
..ProviderConfigV4::default()
},
),
(
"backup".to_string(),
ProviderConfigV4 {
base_url: Some(format!("http://{backup_addr}/v1")),
inline_auth: UpstreamAuth::default(),
..ProviderConfigV4::default()
},
),
]),
routing: Some(RoutingConfigV4 {
entry: "monthly_first".to_string(),
routes: std::collections::BTreeMap::from([(
"monthly_first".to_string(),
RoutingNodeV4 {
strategy: RoutingPolicyV4::OrderedFailover,
children: vec!["primary".to_string(), "backup".to_string()],
..RoutingNodeV4::default()
},
)]),
..RoutingConfigV4::default()
}),
..ServiceViewV4::default()
},
..ProxyConfigV4::default()
};
let runtime = crate::config::compile_v4_to_runtime(&v4).expect("compat runtime");
let lb_states = Arc::new(std::sync::Mutex::new(HashMap::new()));
let proxy = ProxyService::new_with_v4_source(
Client::new(),
Arc::new(runtime),
Some(Arc::new(v4)),
"codex",
lb_states.clone(),
);
let app = crate::proxy::router(proxy);
let (proxy_addr, proxy_handle) = spawn_axum_server(app);
let client = reqwest::Client::new();
let fallback = send_responses_json(&client, proxy_addr, Some("sid-failover")).await;
assert_eq!(fallback["provider"].as_str(), Some("backup"));
assert_eq!(primary_hits.load(Ordering::SeqCst), 1);
assert_eq!(backup_hits.load(Ordering::SeqCst), 1);
{
let guard = lb_states.lock().expect("lb states");
assert!(
guard
.get("routing")
.is_none_or(|entry| entry.failure_counts.iter().all(|count| *count == 0)),
"route graph branch should not write provider health into synthetic routing LB state"
);
}
let preferred = send_responses_json(&client, proxy_addr, Some("sid-failover")).await;
assert_eq!(preferred["provider"].as_str(), Some("primary"));
assert_eq!(primary_hits.load(Ordering::SeqCst), 2);
assert_eq!(backup_hits.load(Ordering::SeqCst), 1);
proxy_handle.abort();
primary_handle.abort();
backup_handle.abort();
}
#[tokio::test]
async fn proxy_v4_conditional_routing_selects_branch_by_request_model() {
let small_hits = Arc::new(AtomicUsize::new(0));
let large_hits = Arc::new(AtomicUsize::new(0));
let small_counter = small_hits.clone();
let small = axum::Router::new().route(
"/v1/responses",
post(move || async move {
small_counter.fetch_add(1, Ordering::SeqCst);
(
StatusCode::OK,
Json(serde_json::json!({ "provider": "small" })),
)
}),
);
let (small_addr, small_handle) = spawn_axum_server(small);
let large_counter = large_hits.clone();
let large = axum::Router::new().route(
"/v1/responses",
post(move || async move {
large_counter.fetch_add(1, Ordering::SeqCst);
(
StatusCode::OK,
Json(serde_json::json!({ "provider": "large" })),
)
}),
);
let (large_addr, large_handle) = spawn_axum_server(large);
let v4 = ProxyConfigV4 {
codex: ServiceViewV4 {
providers: std::collections::BTreeMap::from([
(
"small".to_string(),
ProviderConfigV4 {
base_url: Some(format!("http://{}/v1", small_addr)),
inline_auth: UpstreamAuth::default(),
..ProviderConfigV4::default()
},
),
(
"large".to_string(),
ProviderConfigV4 {
base_url: Some(format!("http://{}/v1", large_addr)),
inline_auth: UpstreamAuth::default(),
..ProviderConfigV4::default()
},
),
]),
routing: Some(RoutingConfigV4 {
entry: "root".to_string(),
routes: std::collections::BTreeMap::from([(
"root".to_string(),
RoutingNodeV4 {
strategy: RoutingPolicyV4::Conditional,
when: Some(RoutingConditionV4 {
model: Some("gpt-5".to_string()),
..RoutingConditionV4::default()
}),
then: Some("large".to_string()),
default_route: Some("small".to_string()),
..RoutingNodeV4::default()
},
)]),
..RoutingConfigV4::default()
}),
..ServiceViewV4::default()
},
..ProxyConfigV4::default()
};
let runtime = crate::config::compile_v4_to_runtime(&v4).expect("compat runtime");
let proxy = ProxyService::new_with_v4_source(
Client::new(),
Arc::new(runtime),
Some(Arc::new(v4)),
"codex",
Arc::new(std::sync::Mutex::new(HashMap::new())),
);
let state = proxy.state.clone();
let app = crate::proxy::router(proxy);
let (proxy_addr, proxy_handle) = spawn_axum_server(app);
let client = reqwest::Client::new();
let large_resp = client
.post(format!("http://{}/v1/responses", proxy_addr))
.header("content-type", "application/json")
.body(r#"{"model":"gpt-5","input":"hi"}"#)
.send()
.await
.expect("send large branch")
.error_for_status()
.expect("large branch status")
.text()
.await
.expect("large branch body");
assert!(large_resp.contains(r#""provider":"large"#));
let small_resp = client
.post(format!("http://{}/v1/responses", proxy_addr))
.header("content-type", "application/json")
.body(r#"{"model":"gpt-4.1","input":"hi"}"#)
.send()
.await
.expect("send small branch")
.error_for_status()
.expect("small branch status")
.text()
.await
.expect("small branch body");
assert!(small_resp.contains(r#""provider":"small"#));
assert_eq!(large_hits.load(Ordering::SeqCst), 1);
assert_eq!(small_hits.load(Ordering::SeqCst), 1);
let finished = state.list_recent_finished(10).await;
let route_paths = finished
.iter()
.filter_map(|request| request.route_decision.as_ref())
.map(|decision| decision.route_path.clone())
.collect::<Vec<_>>();
assert!(
route_paths
.iter()
.any(|path| path == &vec!["root", "large"])
);
assert!(
route_paths
.iter()
.any(|path| path == &vec!["root", "small"])
);
proxy_handle.abort();
small_handle.abort();
large_handle.abort();
}
#[tokio::test]
async fn proxy_same_upstream_retries_502_then_succeeds_without_failover() {
let upstream1_hits = Arc::new(AtomicUsize::new(0));
let upstream2_hits = Arc::new(AtomicUsize::new(0));
let u1_hits = upstream1_hits.clone();
let upstream1 = axum::Router::new().route(
"/v1/responses",
post(move || async move {
let n = u1_hits.fetch_add(1, Ordering::SeqCst);
if n == 0 {
(
StatusCode::BAD_GATEWAY,
Json(serde_json::json!({ "err": "first attempt 502" })),
)
} else {
(
StatusCode::OK,
Json(serde_json::json!({ "ok": true, "upstream": 1 })),
)
}
}),
);
let (u1_addr, u1_handle) = spawn_axum_server(upstream1);
let u2_hits = upstream2_hits.clone();
let upstream2 = axum::Router::new().route(
"/v1/responses",
post(move || async move {
u2_hits.fetch_add(1, Ordering::SeqCst);
(
StatusCode::OK,
Json(serde_json::json!({ "ok": true, "upstream": 2 })),
)
}),
);
let (u2_addr, u2_handle) = spawn_axum_server(upstream2);
let proxy_client = Client::new();
let retry = retry_config(2, "502", Vec::new(), RetryStrategy::SameUpstream);
let cfg = make_proxy_config(
vec![
UpstreamConfig {
base_url: format!("http://{}/v1", u1_addr),
auth: UpstreamAuth {
auth_token: None,
auth_token_env: None,
api_key: None,
api_key_env: None,
},
tags: {
let mut t = HashMap::new();
t.insert("provider_id".to_string(), "u1".to_string());
t
},
supported_models: HashMap::new(),
model_mapping: HashMap::new(),
},
UpstreamConfig {
base_url: format!("http://{}/v1", u2_addr),
auth: UpstreamAuth {
auth_token: None,
auth_token_env: None,
api_key: None,
api_key_env: None,
},
tags: {
let mut t = HashMap::new();
t.insert("provider_id".to_string(), "u2".to_string());
t
},
supported_models: HashMap::new(),
model_mapping: HashMap::new(),
},
],
retry,
);
let proxy = ProxyService::new(
proxy_client,
Arc::new(cfg),
"codex",
Arc::new(std::sync::Mutex::new(HashMap::new())),
);
let app = crate::proxy::router(proxy);
let (proxy_addr, proxy_handle) = spawn_axum_server(app);
let client = reqwest::Client::new();
let resp = client
.post(format!("http://{}/v1/responses", proxy_addr))
.header("content-type", "application/json")
.body(r#"{"model":"gpt","input":"hi"}"#)
.send()
.await
.expect("send");
assert_eq!(resp.status(), StatusCode::OK);
let body = resp.text().await.expect("text");
assert!(
body.contains(r#""upstream":1"#),
"expected response from upstream1, got: {body}"
);
assert_eq!(upstream1_hits.load(Ordering::SeqCst), 2);
assert_eq!(upstream2_hits.load(Ordering::SeqCst), 0);
proxy_handle.abort();
u1_handle.abort();
u2_handle.abort();
}
#[tokio::test]
async fn proxy_failover_across_requests_penalizes_502_when_no_internal_retry() {
let upstream1_hits = Arc::new(AtomicUsize::new(0));
let upstream2_hits = Arc::new(AtomicUsize::new(0));
let u1_hits = upstream1_hits.clone();
let upstream1 = axum::Router::new().route(
"/v1/responses",
post(move || async move {
u1_hits.fetch_add(1, Ordering::SeqCst);
(
StatusCode::BAD_GATEWAY,
Json(serde_json::json!({ "err": "always 502" })),
)
}),
);
let (u1_addr, u1_handle) = spawn_axum_server(upstream1);
let u2_hits = upstream2_hits.clone();
let upstream2 = axum::Router::new().route(
"/v1/responses",
post(move || {
u2_hits.fetch_add(1, Ordering::SeqCst);
async move {
let s = stream::iter(
vec![Bytes::from_static(
b"data: {\"ok\":true,\"upstream\":2}\n\n",
)]
.into_iter()
.map(Ok::<Bytes, Infallible>),
);
let mut resp = Response::new(Body::from_stream(s));
*resp.status_mut() = StatusCode::OK;
resp.headers_mut().insert(
axum::http::header::CONTENT_TYPE,
HeaderValue::from_static("text/event-stream"),
);
resp
}
}),
);
let (u2_addr, u2_handle) = spawn_axum_server(upstream2);
let proxy_client = Client::new();
let retry = RetryConfig {
upstream: Some(crate::config::RetryLayerConfig {
max_attempts: Some(1),
backoff_ms: Some(0),
backoff_max_ms: Some(0),
jitter_ms: Some(0),
on_status: Some("502".to_string()),
on_class: Some(Vec::new()),
strategy: Some(RetryStrategy::SameUpstream),
}),
provider: Some(crate::config::RetryLayerConfig {
max_attempts: Some(2),
backoff_ms: Some(0),
backoff_max_ms: Some(0),
jitter_ms: Some(0),
on_status: Some("502".to_string()),
on_class: Some(Vec::new()),
strategy: Some(RetryStrategy::Failover),
}),
allow_cross_station_before_first_output: Some(true),
cloudflare_challenge_cooldown_secs: Some(0),
cloudflare_timeout_cooldown_secs: Some(0),
transport_cooldown_secs: Some(60),
cooldown_backoff_factor: Some(1),
cooldown_backoff_max_secs: Some(0),
..Default::default()
};
let cfg = make_proxy_config(
vec![
UpstreamConfig {
base_url: format!("http://{}/v1", u1_addr),
auth: UpstreamAuth {
auth_token: None,
auth_token_env: None,
api_key: None,
api_key_env: None,
},
tags: {
let mut t = HashMap::new();
t.insert("provider_id".to_string(), "u1".to_string());
t
},
supported_models: HashMap::new(),
model_mapping: HashMap::new(),
},
UpstreamConfig {
base_url: format!("http://{}/v1", u2_addr),
auth: UpstreamAuth {
auth_token: None,
auth_token_env: None,
api_key: None,
api_key_env: None,
},
tags: {
let mut t = HashMap::new();
t.insert("provider_id".to_string(), "u2".to_string());
t
},
supported_models: HashMap::new(),
model_mapping: HashMap::new(),
},
],
retry,
);
let proxy = ProxyService::new(
proxy_client,
Arc::new(cfg),
"codex",
Arc::new(std::sync::Mutex::new(HashMap::new())),
);
let state = proxy.state.clone();
let app = crate::proxy::router(proxy);
let (proxy_addr, proxy_handle) = spawn_axum_server(app);
let client = reqwest::Client::new();
let resp1 = client
.post(format!("http://{}/v1/responses", proxy_addr))
.header("content-type", "application/json")
.header("accept", "text/event-stream")
.body(r#"{"model":"gpt","input":"hi"}"#)
.send()
.await
.expect("send");
assert_eq!(resp1.status(), StatusCode::OK);
let body1 = resp1.bytes().await.expect("read bytes");
let body1_s = String::from_utf8_lossy(&body1);
assert!(
body1_s.contains(r#""upstream":2"#),
"expected response from upstream2, got: {body1_s}"
);
let mut finished = Vec::new();
for _ in 0..100 {
finished = state.list_recent_finished(10).await;
if finished.iter().any(|request| request.retry.is_some()) {
break;
}
sleep(Duration::from_millis(20)).await;
}
let retry = finished
.iter()
.find_map(|request| request.retry.as_ref())
.expect("streaming failover should record retry info");
assert_eq!(retry.attempts, 2);
assert_eq!(retry.route_attempts.len(), 2);
assert_eq!(retry.route_attempts[0].decision, "failed_status");
assert_eq!(retry.route_attempts[0].provider_id.as_deref(), Some("u1"));
assert_eq!(retry.route_attempts[0].provider_attempt, Some(1));
assert_eq!(retry.route_attempts[1].decision, "completed");
assert_eq!(retry.route_attempts[1].provider_id.as_deref(), Some("u2"));
assert_eq!(retry.route_attempts[1].provider_attempt, Some(1));
assert_eq!(retry.route_attempts[1].upstream_index, Some(1));
assert!(retry.route_attempts[1].upstream_headers_ms.is_some());
let (status2, body2) = {
let mut last_status = StatusCode::INTERNAL_SERVER_ERROR;
let mut last_body: Bytes = Bytes::new();
for attempt in 0..3 {
let resp2 = client
.post(format!("http://{}/v1/responses", proxy_addr))
.header("content-type", "application/json")
.header("accept", "text/event-stream")
.body(r#"{"model":"gpt","input":"hi"}"#)
.send()
.await
.expect("send");
last_status = resp2.status();
last_body = resp2.bytes().await.expect("read bytes");
if last_status == StatusCode::OK {
break;
}
if attempt < 2 {
tokio::time::sleep(std::time::Duration::from_millis(30)).await;
}
}
(last_status, last_body)
};
assert_eq!(status2, StatusCode::OK);
let body_s = String::from_utf8_lossy(&body2);
assert!(
body_s.contains(r#""upstream":2"#),
"expected response from upstream2, got: {body_s}"
);
assert_eq!(upstream1_hits.load(Ordering::SeqCst), 1);
assert_eq!(upstream2_hits.load(Ordering::SeqCst), 2);
proxy_handle.abort();
u1_handle.abort();
u2_handle.abort();
}
#[tokio::test]
async fn proxy_failover_across_requests_penalizes_transport_error_when_no_internal_retry() {
let upstream2_hits = Arc::new(AtomicUsize::new(0));
let u2_hits = upstream2_hits.clone();
let upstream2 = axum::Router::new().route(
"/v1/responses",
post(move || {
u2_hits.fetch_add(1, Ordering::SeqCst);
async move {
let s = stream::iter(
vec![Bytes::from_static(
b"data: {\"ok\":true,\"upstream\":2}\n\n",
)]
.into_iter()
.map(Ok::<Bytes, Infallible>),
);
let mut resp = Response::new(Body::from_stream(s));
*resp.status_mut() = StatusCode::OK;
resp.headers_mut().insert(
axum::http::header::CONTENT_TYPE,
HeaderValue::from_static("text/event-stream"),
);
resp
}
}),
);
let (u2_addr, u2_handle) = spawn_axum_server(upstream2);
let unused = reserve_unused_local_addr();
let proxy_client = Client::new();
let retry = retry_config_with_cooldowns(
1,
"502",
vec!["upstream_transport_error".to_string()],
RetryStrategy::Failover,
0,
0,
60,
);
let cfg = make_proxy_config(
vec![
UpstreamConfig {
base_url: format!("http://{}/v1", unused),
auth: UpstreamAuth {
auth_token: None,
auth_token_env: None,
api_key: None,
api_key_env: None,
},
tags: {
let mut t = HashMap::new();
t.insert("provider_id".to_string(), "u1".to_string());
t
},
supported_models: HashMap::new(),
model_mapping: HashMap::new(),
},
UpstreamConfig {
base_url: format!("http://{}/v1", u2_addr),
auth: UpstreamAuth {
auth_token: None,
auth_token_env: None,
api_key: None,
api_key_env: None,
},
tags: {
let mut t = HashMap::new();
t.insert("provider_id".to_string(), "u2".to_string());
t
},
supported_models: HashMap::new(),
model_mapping: HashMap::new(),
},
],
retry,
);
let proxy = ProxyService::new(
proxy_client,
Arc::new(cfg),
"codex",
Arc::new(std::sync::Mutex::new(HashMap::new())),
);
let app = crate::proxy::router(proxy);
let (proxy_addr, proxy_handle) = spawn_axum_server(app);
let client = reqwest::Client::new();
let resp1 = client
.post(format!("http://{}/v1/responses", proxy_addr))
.header("content-type", "application/json")
.header("accept", "text/event-stream")
.body(r#"{"model":"gpt","input":"hi"}"#)
.send()
.await
.expect("send");
assert_eq!(resp1.status(), StatusCode::OK);
let body1 = resp1.bytes().await.expect("read bytes");
let body1_s = String::from_utf8_lossy(&body1);
assert!(
body1_s.contains(r#""upstream":2"#),
"expected response from upstream2, got: {body1_s}"
);
let resp2 = client
.post(format!("http://{}/v1/responses", proxy_addr))
.header("content-type", "application/json")
.header("accept", "text/event-stream")
.body(r#"{"model":"gpt","input":"hi"}"#)
.send()
.await
.expect("send");
assert_eq!(resp2.status(), StatusCode::OK);
let body = resp2.bytes().await.expect("read bytes");
let body_s = String::from_utf8_lossy(&body);
assert!(
body_s.contains(r#""upstream":2"#),
"expected response from upstream2, got: {body_s}"
);
assert_eq!(upstream2_hits.load(Ordering::SeqCst), 2);
proxy_handle.abort();
u2_handle.abort();
}
#[tokio::test]
async fn proxy_failover_across_requests_penalizes_cloudflare_challenge_when_no_internal_retry() {
let upstream1_hits = Arc::new(AtomicUsize::new(0));
let upstream2_hits = Arc::new(AtomicUsize::new(0));
let u1_hits = upstream1_hits.clone();
let upstream1 = axum::Router::new().route(
"/v1/responses",
post(move || async move {
u1_hits.fetch_add(1, Ordering::SeqCst);
let mut resp = Response::new(Body::from(
"<html><body>/cdn-cgi/ challenge-platform __CF$cv$params</body></html>",
));
*resp.status_mut() = StatusCode::FORBIDDEN;
resp.headers_mut().insert(
axum::http::header::CONTENT_TYPE,
HeaderValue::from_static("text/html; charset=utf-8"),
);
resp.headers_mut()
.insert("server", HeaderValue::from_static("cloudflare"));
resp.headers_mut()
.insert("cf-ray", HeaderValue::from_static("test"));
resp
}),
);
let (u1_addr, u1_handle) = spawn_axum_server(upstream1);
let u2_hits = upstream2_hits.clone();
let upstream2 = axum::Router::new().route(
"/v1/responses",
post(move || {
u2_hits.fetch_add(1, Ordering::SeqCst);
async move {
let s = stream::iter(
vec![Bytes::from_static(
b"data: {\"ok\":true,\"upstream\":2}\n\n",
)]
.into_iter()
.map(Ok::<Bytes, Infallible>),
);
let mut resp = Response::new(Body::from_stream(s));
*resp.status_mut() = StatusCode::OK;
resp.headers_mut().insert(
axum::http::header::CONTENT_TYPE,
HeaderValue::from_static("text/event-stream"),
);
resp
}
}),
);
let (u2_addr, u2_handle) = spawn_axum_server(upstream2);
let proxy_client = Client::new();
let retry = retry_config_with_cooldowns(
1,
"502",
vec!["cloudflare_challenge".to_string()],
RetryStrategy::Failover,
60,
0,
0,
);
let cfg = make_proxy_config(
vec![
UpstreamConfig {
base_url: format!("http://{}/v1", u1_addr),
auth: UpstreamAuth {
auth_token: None,
auth_token_env: None,
api_key: None,
api_key_env: None,
},
tags: {
let mut t = HashMap::new();
t.insert("provider_id".to_string(), "u1".to_string());
t
},
supported_models: HashMap::new(),
model_mapping: HashMap::new(),
},
UpstreamConfig {
base_url: format!("http://{}/v1", u2_addr),
auth: UpstreamAuth {
auth_token: None,
auth_token_env: None,
api_key: None,
api_key_env: None,
},
tags: {
let mut t = HashMap::new();
t.insert("provider_id".to_string(), "u2".to_string());
t
},
supported_models: HashMap::new(),
model_mapping: HashMap::new(),
},
],
retry,
);
let proxy = ProxyService::new(
proxy_client,
Arc::new(cfg),
"codex",
Arc::new(std::sync::Mutex::new(HashMap::new())),
);
let app = crate::proxy::router(proxy);
let (proxy_addr, proxy_handle) = spawn_axum_server(app);
let client = reqwest::Client::new();
let resp1 = client
.post(format!("http://{}/v1/responses", proxy_addr))
.header("content-type", "application/json")
.header("accept", "text/event-stream")
.body(r#"{"model":"gpt","input":"hi"}"#)
.send()
.await
.expect("send");
assert_eq!(resp1.status(), StatusCode::OK);
let body1 = resp1.bytes().await.expect("read bytes");
let body1_s = String::from_utf8_lossy(&body1);
assert!(
body1_s.contains(r#""upstream":2"#),
"expected response from upstream2, got: {body1_s}"
);
let resp2 = client
.post(format!("http://{}/v1/responses", proxy_addr))
.header("content-type", "application/json")
.header("accept", "text/event-stream")
.body(r#"{"model":"gpt","input":"hi"}"#)
.send()
.await
.expect("send");
assert_eq!(resp2.status(), StatusCode::OK);
let body2 = resp2.bytes().await.expect("read bytes");
let body2_s = String::from_utf8_lossy(&body2);
assert!(
body2_s.contains(r#""upstream":2"#),
"expected response from upstream2, got: {body2_s}"
);
assert_eq!(upstream1_hits.load(Ordering::SeqCst), 1);
assert_eq!(upstream2_hits.load(Ordering::SeqCst), 2);
proxy_handle.abort();
u1_handle.abort();
u2_handle.abort();
}
#[tokio::test]
async fn proxy_multi_config_failover_across_requests_respects_cooldown() {
let primary_hits = Arc::new(AtomicUsize::new(0));
let backup_hits = Arc::new(AtomicUsize::new(0));
let p_hits = primary_hits.clone();
let primary = axum::Router::new().route(
"/v1/responses",
post(move || async move {
p_hits.fetch_add(1, Ordering::SeqCst);
(
StatusCode::BAD_GATEWAY,
Json(serde_json::json!({ "err": "primary 502" })),
)
}),
);
let (p_addr, p_handle) = spawn_axum_server(primary);
let b_hits = backup_hits.clone();
let backup = axum::Router::new().route(
"/v1/responses",
post(move || {
b_hits.fetch_add(1, Ordering::SeqCst);
async move {
let s = stream::iter(
vec![Bytes::from_static(
b"data: {\"ok\":true,\"upstream\":\"backup\"}\n\n",
)]
.into_iter()
.map(Ok::<Bytes, Infallible>),
);
let mut resp = Response::new(Body::from_stream(s));
*resp.status_mut() = StatusCode::OK;
resp.headers_mut().insert(
axum::http::header::CONTENT_TYPE,
HeaderValue::from_static("text/event-stream"),
);
resp
}
}),
);
let (b_addr, b_handle) = spawn_axum_server(backup);
let retry = RetryConfig {
upstream: Some(crate::config::RetryLayerConfig {
max_attempts: Some(1),
backoff_ms: Some(0),
backoff_max_ms: Some(0),
jitter_ms: Some(0),
on_status: Some("502".to_string()),
on_class: Some(Vec::new()),
strategy: Some(RetryStrategy::SameUpstream),
}),
provider: Some(crate::config::RetryLayerConfig {
max_attempts: Some(2),
backoff_ms: Some(0),
backoff_max_ms: Some(0),
jitter_ms: Some(0),
on_status: Some("502".to_string()),
on_class: Some(Vec::new()),
strategy: Some(RetryStrategy::Failover),
}),
allow_cross_station_before_first_output: Some(true),
cloudflare_challenge_cooldown_secs: Some(0),
cloudflare_timeout_cooldown_secs: Some(0),
transport_cooldown_secs: Some(60),
cooldown_backoff_factor: Some(1),
cooldown_backoff_max_secs: Some(0),
..Default::default()
};
let mut mgr = ServiceConfigManager {
active: Some("primary".to_string()),
..Default::default()
};
mgr.configs.insert(
"primary".to_string(),
ServiceConfig {
name: "primary".to_string(),
alias: None,
enabled: true,
level: 1,
upstreams: vec![UpstreamConfig {
base_url: format!("http://{}/v1", p_addr),
auth: UpstreamAuth {
auth_token: None,
auth_token_env: None,
api_key: None,
api_key_env: None,
},
tags: {
let mut t = HashMap::new();
t.insert("provider_id".to_string(), "primary".to_string());
t
},
supported_models: HashMap::new(),
model_mapping: HashMap::new(),
}],
},
);
mgr.configs.insert(
"backup".to_string(),
ServiceConfig {
name: "backup".to_string(),
alias: None,
enabled: true,
level: 2,
upstreams: vec![UpstreamConfig {
base_url: format!("http://{}/v1", b_addr),
auth: UpstreamAuth {
auth_token: None,
auth_token_env: None,
api_key: None,
api_key_env: None,
},
tags: {
let mut t = HashMap::new();
t.insert("provider_id".to_string(), "backup".to_string());
t
},
supported_models: HashMap::new(),
model_mapping: HashMap::new(),
}],
},
);
let cfg = ProxyConfig {
version: Some(1),
codex: mgr,
claude: ServiceConfigManager::default(),
retry,
notify: Default::default(),
default_service: None,
ui: UiConfig::default(),
};
let proxy = ProxyService::new(
Client::new(),
Arc::new(cfg),
"codex",
Arc::new(std::sync::Mutex::new(HashMap::new())),
);
let app = crate::proxy::router(proxy);
let (proxy_addr, proxy_handle) = spawn_axum_server(app);
let client = reqwest::Client::new();
let resp1 = client
.post(format!("http://{}/v1/responses", proxy_addr))
.header("content-type", "application/json")
.header("accept", "text/event-stream")
.body(r#"{"model":"gpt","input":"hi"}"#)
.send()
.await
.expect("send");
assert_eq!(resp1.status(), StatusCode::OK);
let body1 = resp1.bytes().await.expect("read bytes");
let body1_s = String::from_utf8_lossy(&body1);
assert!(
body1_s.contains(r#""upstream":"backup""#),
"expected response from backup, got: {body1_s}"
);
let resp2 = client
.post(format!("http://{}/v1/responses", proxy_addr))
.header("content-type", "application/json")
.header("accept", "text/event-stream")
.body(r#"{"model":"gpt","input":"hi"}"#)
.send()
.await
.expect("send");
assert_eq!(resp2.status(), StatusCode::OK);
let body2 = resp2.bytes().await.expect("read bytes");
let body2_s = String::from_utf8_lossy(&body2);
assert!(
body2_s.contains(r#""upstream":"backup""#),
"expected response from backup, got: {body2_s}"
);
assert_eq!(primary_hits.load(Ordering::SeqCst), 1);
assert_eq!(backup_hits.load(Ordering::SeqCst), 2);
proxy_handle.abort();
p_handle.abort();
b_handle.abort();
}
#[tokio::test]
async fn proxy_multi_config_does_not_cross_station_failover_when_pre_output_guard_disabled() {
let primary_hits = Arc::new(AtomicUsize::new(0));
let backup_hits = Arc::new(AtomicUsize::new(0));
let p_hits = primary_hits.clone();
let primary = axum::Router::new().route(
"/v1/responses",
post(move || async move {
p_hits.fetch_add(1, Ordering::SeqCst);
(
StatusCode::BAD_GATEWAY,
Json(serde_json::json!({ "err": "primary 502" })),
)
}),
);
let (p_addr, p_handle) = spawn_axum_server(primary);
let b_hits = backup_hits.clone();
let backup = axum::Router::new().route(
"/v1/responses",
post(move || async move {
b_hits.fetch_add(1, Ordering::SeqCst);
(
StatusCode::OK,
Json(serde_json::json!({ "ok": true, "upstream": "backup" })),
)
}),
);
let (b_addr, b_handle) = spawn_axum_server(backup);
let retry = RetryConfig {
upstream: Some(crate::config::RetryLayerConfig {
max_attempts: Some(1),
backoff_ms: Some(0),
backoff_max_ms: Some(0),
jitter_ms: Some(0),
on_status: Some("502".to_string()),
on_class: Some(Vec::new()),
strategy: Some(RetryStrategy::SameUpstream),
}),
provider: Some(crate::config::RetryLayerConfig {
max_attempts: Some(2),
backoff_ms: Some(0),
backoff_max_ms: Some(0),
jitter_ms: Some(0),
on_status: Some("502".to_string()),
on_class: Some(Vec::new()),
strategy: Some(RetryStrategy::Failover),
}),
allow_cross_station_before_first_output: Some(false),
cloudflare_challenge_cooldown_secs: Some(0),
cloudflare_timeout_cooldown_secs: Some(0),
transport_cooldown_secs: Some(60),
cooldown_backoff_factor: Some(1),
cooldown_backoff_max_secs: Some(0),
..Default::default()
};
let mut mgr = ServiceConfigManager {
active: Some("primary".to_string()),
..Default::default()
};
mgr.configs.insert(
"primary".to_string(),
ServiceConfig {
name: "primary".to_string(),
alias: None,
enabled: true,
level: 1,
upstreams: vec![UpstreamConfig {
base_url: format!("http://{}/v1", p_addr),
auth: UpstreamAuth {
auth_token: None,
auth_token_env: None,
api_key: None,
api_key_env: None,
},
tags: {
let mut t = HashMap::new();
t.insert("provider_id".to_string(), "primary".to_string());
t
},
supported_models: HashMap::new(),
model_mapping: HashMap::new(),
}],
},
);
mgr.configs.insert(
"backup".to_string(),
ServiceConfig {
name: "backup".to_string(),
alias: None,
enabled: true,
level: 2,
upstreams: vec![UpstreamConfig {
base_url: format!("http://{}/v1", b_addr),
auth: UpstreamAuth {
auth_token: None,
auth_token_env: None,
api_key: None,
api_key_env: None,
},
tags: {
let mut t = HashMap::new();
t.insert("provider_id".to_string(), "backup".to_string());
t
},
supported_models: HashMap::new(),
model_mapping: HashMap::new(),
}],
},
);
let cfg = ProxyConfig {
version: Some(1),
codex: mgr,
claude: ServiceConfigManager::default(),
retry,
notify: Default::default(),
default_service: None,
ui: UiConfig::default(),
};
let proxy = ProxyService::new(
Client::new(),
Arc::new(cfg),
"codex",
Arc::new(std::sync::Mutex::new(HashMap::new())),
);
let app = crate::proxy::router(proxy);
let (proxy_addr, proxy_handle) = spawn_axum_server(app);
let client = reqwest::Client::new();
let resp1 = client
.post(format!("http://{}/v1/responses", proxy_addr))
.header("content-type", "application/json")
.body(r#"{"model":"gpt","input":"hi"}"#)
.send()
.await
.expect("send");
assert_eq!(resp1.status(), StatusCode::BAD_GATEWAY);
let body1 = resp1.text().await.expect("read body");
assert!(
body1.contains("primary 502"),
"expected first request to fail at primary, got: {body1}"
);
let resp2 = client
.post(format!("http://{}/v1/responses", proxy_addr))
.header("content-type", "application/json")
.body(r#"{"model":"gpt","input":"hi"}"#)
.send()
.await
.expect("send");
assert_eq!(resp2.status(), StatusCode::BAD_GATEWAY);
let body2 = resp2.text().await.expect("read body");
assert!(
!body2.contains("backup"),
"expected second request to stay blocked instead of using backup, got: {body2}"
);
assert_eq!(primary_hits.load(Ordering::SeqCst), 1);
assert_eq!(backup_hits.load(Ordering::SeqCst), 0);
proxy_handle.abort();
p_handle.abort();
b_handle.abort();
}
#[tokio::test]
async fn proxy_does_not_failover_when_502_is_not_retryable_and_threshold_not_reached() {
let upstream1_hits = Arc::new(AtomicUsize::new(0));
let upstream2_hits = Arc::new(AtomicUsize::new(0));
let u1_hits = upstream1_hits.clone();
let upstream1 = axum::Router::new().route(
"/v1/responses",
post(move || async move {
u1_hits.fetch_add(1, Ordering::SeqCst);
(
StatusCode::BAD_GATEWAY,
Json(serde_json::json!({ "err": "always 502" })),
)
}),
);
let (u1_addr, u1_handle) = spawn_axum_server(upstream1);
let u2_hits = upstream2_hits.clone();
let upstream2 = axum::Router::new().route(
"/v1/responses",
post(move || {
u2_hits.fetch_add(1, Ordering::SeqCst);
async move {
let s = stream::iter(
vec![Bytes::from_static(
b"data: {\"ok\":true,\"upstream\":2}\n\n",
)]
.into_iter()
.map(Ok::<Bytes, Infallible>),
);
let mut resp = Response::new(Body::from_stream(s));
*resp.status_mut() = StatusCode::OK;
resp.headers_mut().insert(
axum::http::header::CONTENT_TYPE,
HeaderValue::from_static("text/event-stream"),
);
resp
}
}),
);
let (u2_addr, u2_handle) = spawn_axum_server(upstream2);
let proxy_client = Client::new();
let retry = RetryConfig {
upstream: Some(crate::config::RetryLayerConfig {
max_attempts: Some(1),
backoff_ms: Some(0),
backoff_max_ms: Some(0),
jitter_ms: Some(0),
on_status: Some("".to_string()),
on_class: Some(Vec::new()),
strategy: Some(RetryStrategy::SameUpstream),
}),
provider: Some(crate::config::RetryLayerConfig {
max_attempts: Some(1),
backoff_ms: Some(0),
backoff_max_ms: Some(0),
jitter_ms: Some(0),
on_status: Some("".to_string()),
on_class: Some(Vec::new()),
strategy: Some(RetryStrategy::Failover),
}),
cloudflare_challenge_cooldown_secs: Some(0),
cloudflare_timeout_cooldown_secs: Some(0),
transport_cooldown_secs: Some(60),
cooldown_backoff_factor: Some(1),
cooldown_backoff_max_secs: Some(0),
..Default::default()
};
let cfg = make_proxy_config(
vec![
UpstreamConfig {
base_url: format!("http://{}/v1", u1_addr),
auth: UpstreamAuth {
auth_token: None,
auth_token_env: None,
api_key: None,
api_key_env: None,
},
tags: HashMap::new(),
supported_models: HashMap::new(),
model_mapping: HashMap::new(),
},
UpstreamConfig {
base_url: format!("http://{}/v1", u2_addr),
auth: UpstreamAuth {
auth_token: None,
auth_token_env: None,
api_key: None,
api_key_env: None,
},
tags: HashMap::new(),
supported_models: HashMap::new(),
model_mapping: HashMap::new(),
},
],
retry,
);
let proxy = ProxyService::new(
proxy_client,
Arc::new(cfg),
"codex",
Arc::new(std::sync::Mutex::new(HashMap::new())),
);
let app = crate::proxy::router(proxy);
let (proxy_addr, proxy_handle) = spawn_axum_server(app);
let client = reqwest::Client::new();
let resp1 = client
.post(format!("http://{}/v1/responses", proxy_addr))
.header("content-type", "application/json")
.header("accept", "text/event-stream")
.body(r#"{"model":"gpt","input":"hi"}"#)
.send()
.await
.expect("send");
assert_eq!(resp1.status(), StatusCode::BAD_GATEWAY);
let resp2 = client
.post(format!("http://{}/v1/responses", proxy_addr))
.header("content-type", "application/json")
.header("accept", "text/event-stream")
.body(r#"{"model":"gpt","input":"hi"}"#)
.send()
.await
.expect("send");
assert_eq!(resp2.status(), StatusCode::BAD_GATEWAY);
assert_eq!(upstream1_hits.load(Ordering::SeqCst), 2);
assert_eq!(upstream2_hits.load(Ordering::SeqCst), 0);
proxy_handle.abort();
u1_handle.abort();
u2_handle.abort();
}
#[tokio::test]
async fn proxy_retries_each_upstream_once_and_stops_when_all_avoided() {
let upstream1_hits = Arc::new(AtomicUsize::new(0));
let upstream2_hits = Arc::new(AtomicUsize::new(0));
let u1_hits = upstream1_hits.clone();
let upstream1 = axum::Router::new().route(
"/v1/responses",
post(move || async move {
u1_hits.fetch_add(1, Ordering::SeqCst);
(
StatusCode::BAD_GATEWAY,
Json(serde_json::json!({ "err": "u1 502" })),
)
}),
);
let (u1_addr, u1_handle) = spawn_axum_server(upstream1);
let u2_hits = upstream2_hits.clone();
let upstream2 = axum::Router::new().route(
"/v1/responses",
post(move || async move {
u2_hits.fetch_add(1, Ordering::SeqCst);
(
StatusCode::BAD_GATEWAY,
Json(serde_json::json!({ "err": "u2 502" })),
)
}),
);
let (u2_addr, u2_handle) = spawn_axum_server(upstream2);
let proxy_client = Client::new();
let retry = RetryConfig {
upstream: Some(crate::config::RetryLayerConfig {
max_attempts: Some(1),
backoff_ms: Some(0),
backoff_max_ms: Some(0),
jitter_ms: Some(0),
on_status: Some("502".to_string()),
on_class: Some(Vec::new()),
strategy: Some(RetryStrategy::SameUpstream),
}),
provider: Some(crate::config::RetryLayerConfig {
max_attempts: Some(1),
backoff_ms: Some(0),
backoff_max_ms: Some(0),
jitter_ms: Some(0),
on_status: Some("502".to_string()),
on_class: Some(Vec::new()),
strategy: Some(RetryStrategy::Failover),
}),
cloudflare_challenge_cooldown_secs: Some(0),
cloudflare_timeout_cooldown_secs: Some(0),
transport_cooldown_secs: Some(0),
cooldown_backoff_factor: Some(1),
cooldown_backoff_max_secs: Some(0),
..Default::default()
};
let cfg = make_proxy_config(
vec![
UpstreamConfig {
base_url: format!("http://{}/v1", u1_addr),
auth: UpstreamAuth {
auth_token: None,
auth_token_env: None,
api_key: None,
api_key_env: None,
},
tags: HashMap::new(),
supported_models: HashMap::new(),
model_mapping: HashMap::new(),
},
UpstreamConfig {
base_url: format!("http://{}/v1", u2_addr),
auth: UpstreamAuth {
auth_token: None,
auth_token_env: None,
api_key: None,
api_key_env: None,
},
tags: HashMap::new(),
supported_models: HashMap::new(),
model_mapping: HashMap::new(),
},
],
retry,
);
let proxy = ProxyService::new(
proxy_client,
Arc::new(cfg),
"codex",
Arc::new(std::sync::Mutex::new(HashMap::new())),
);
let app = crate::proxy::router(proxy);
let (proxy_addr, proxy_handle) = spawn_axum_server(app);
let client = reqwest::Client::new();
let resp = client
.post(format!("http://{}/v1/responses", proxy_addr))
.header("content-type", "application/json")
.header("accept", "text/event-stream")
.body(r#"{"model":"gpt","input":"hi"}"#)
.send()
.await
.expect("send");
assert_eq!(resp.status(), StatusCode::BAD_GATEWAY);
let body = resp.text().await.expect("read body");
assert!(
body.contains("all upstream attempts failed"),
"expected aggregated failure summary, got: {body}"
);
assert!(
body.contains("upstream[0]") && body.contains("upstream[1]"),
"expected both upstream attempts in failure summary, got: {body}"
);
assert!(
body.contains("last_error:") && body.contains("u2 502"),
"expected final upstream error body in failure summary, got: {body}"
);
assert_eq!(upstream1_hits.load(Ordering::SeqCst), 1);
assert_eq!(upstream2_hits.load(Ordering::SeqCst), 1);
proxy_handle.abort();
u1_handle.abort();
u2_handle.abort();
}
#[tokio::test]
async fn failed_single_attempt_records_route_attempts_for_logs() {
let upstream = axum::Router::new().route(
"/v1/responses",
post(move || async move {
(
StatusCode::BAD_GATEWAY,
Json(serde_json::json!({ "err": "single 502" })),
)
}),
);
let (upstream_addr, upstream_handle) = spawn_axum_server(upstream);
let retry = RetryConfig {
upstream: Some(crate::config::RetryLayerConfig {
max_attempts: Some(1),
backoff_ms: Some(0),
backoff_max_ms: Some(0),
jitter_ms: Some(0),
on_status: Some("502".to_string()),
on_class: Some(Vec::new()),
strategy: Some(RetryStrategy::SameUpstream),
}),
provider: Some(crate::config::RetryLayerConfig {
max_attempts: Some(1),
backoff_ms: Some(0),
backoff_max_ms: Some(0),
jitter_ms: Some(0),
on_status: Some("502".to_string()),
on_class: Some(Vec::new()),
strategy: Some(RetryStrategy::Failover),
}),
cloudflare_challenge_cooldown_secs: Some(0),
cloudflare_timeout_cooldown_secs: Some(0),
transport_cooldown_secs: Some(0),
cooldown_backoff_factor: Some(1),
cooldown_backoff_max_secs: Some(0),
..Default::default()
};
let cfg = make_proxy_config(
vec![UpstreamConfig {
base_url: format!("http://{}/v1", upstream_addr),
auth: UpstreamAuth {
auth_token: None,
auth_token_env: None,
api_key: None,
api_key_env: None,
},
tags: {
let mut tags = HashMap::new();
tags.insert("provider_id".to_string(), "solo".to_string());
tags
},
supported_models: HashMap::new(),
model_mapping: HashMap::new(),
}],
retry,
);
let proxy = ProxyService::new(
Client::new(),
Arc::new(cfg),
"codex",
Arc::new(std::sync::Mutex::new(HashMap::new())),
);
let state = proxy.state.clone();
let app = crate::proxy::router(proxy);
let (proxy_addr, proxy_handle) = spawn_axum_server(app);
let resp = reqwest::Client::new()
.post(format!("http://{}/v1/responses", proxy_addr))
.header("content-type", "application/json")
.body(r#"{"model":"gpt","input":"hi"}"#)
.send()
.await
.expect("send");
assert_eq!(resp.status(), StatusCode::BAD_GATEWAY);
let body = resp.text().await.expect("read body");
assert!(
body.contains("all upstream attempts failed") && body.contains("single 502"),
"expected single-attempt failure summary, got: {body}"
);
let finished = state.list_recent_finished(1).await;
let retry = finished
.first()
.and_then(|request| request.retry.as_ref())
.expect("failed single attempt should keep route attempts in request logs");
assert_eq!(retry.attempts, 1);
assert_eq!(retry.route_attempts.len(), 1);
assert_eq!(retry.route_attempts[0].decision, "failed_status");
assert_eq!(retry.route_attempts[0].provider_id.as_deref(), Some("solo"));
assert_eq!(retry.route_attempts[0].status_code, Some(502));
proxy_handle.abort();
upstream_handle.abort();
}