use std::collections::{BTreeMap, BTreeSet, HashSet};
use std::sync::OnceLock;
#[cfg(test)]
use std::sync::atomic::{AtomicUsize, Ordering};
use std::time::Instant;
use axum::body::{Body, Bytes};
use axum::http::{HeaderMap, Method, Response, StatusCode, Uri};
use crate::config::{RetryStrategy, RoutingAffinityPolicyV5};
use crate::lb::{CooldownBackoff, LoadBalancer};
use crate::logging::{BodyPreview, HeaderEntry, RouteAttemptLog, ServiceTierLog, log_retry_trace};
use crate::routing_ir::{
RoutePlanAttemptState, RoutePlanExecutor, RoutePlanRuntimeState, RoutePlanSkipReason,
SelectedRouteCandidate, SkippedRouteCandidate, SkippedStationRouteCandidate,
compile_legacy_route_plan_template,
};
use crate::state::SessionBinding;
use super::ProxyService;
use super::attempt_execution::{
ExecuteSelectedUpstreamParams, SelectedUpstreamExecutionOutcome, execute_selected_upstream,
};
use super::attempt_selection::station_upstreams_exhausted;
use super::attempt_target::AttemptTarget;
use super::provider_orchestration::{
CrossStationFailoverBlockedParams, cross_station_failover_enabled,
log_cross_station_failover_blocked, log_same_station_failover_trace,
next_provider_load_balancer, provider_attempt_limit, station_loop_action_after_attempt,
};
use super::request_preparation::RequestFlavor;
use super::request_routing::RequestRouteSelection;
use super::retry::{RetryPlan, backoff_sleep};
use super::route_affinity::apply_session_route_affinity_to_runtime;
use super::route_attempts::{UnsupportedModelSkipParams, record_unsupported_model_skip};
use super::route_executor_runtime::route_plan_runtime_state_from_lbs_with_overrides;
pub(super) struct ExecuteProviderChainParams<'a> {
pub(super) proxy: &'a ProxyService,
pub(super) route_selection: &'a RequestRouteSelection,
pub(super) method: &'a Method,
pub(super) uri: &'a Uri,
pub(super) client_headers: &'a HeaderMap,
pub(super) client_headers_entries_cache: &'a OnceLock<Vec<HeaderEntry>>,
pub(super) client_uri: &'a str,
pub(super) start: &'a Instant,
pub(super) started_at_ms: u64,
pub(super) request_id: u64,
pub(super) request_body_len: usize,
pub(super) body_for_upstream: &'a Bytes,
pub(super) request_model: Option<&'a str>,
pub(super) session_binding: Option<&'a SessionBinding>,
pub(super) session_override_config: Option<&'a str>,
pub(super) global_station_override: Option<&'a str>,
pub(super) override_model: Option<&'a str>,
pub(super) override_effort: Option<&'a str>,
pub(super) override_service_tier: Option<&'a str>,
pub(super) effective_effort: Option<&'a str>,
pub(super) effective_service_tier: Option<&'a str>,
pub(super) base_service_tier: &'a ServiceTierLog,
pub(super) session_id: Option<&'a str>,
pub(super) cwd: Option<&'a str>,
pub(super) request_flavor: &'a RequestFlavor,
pub(super) request_body_previews: bool,
pub(super) debug_max: usize,
pub(super) warn_max: usize,
pub(super) client_body_debug: Option<&'a BodyPreview>,
pub(super) client_body_warn: Option<&'a BodyPreview>,
pub(super) plan: &'a RetryPlan,
pub(super) cooldown_backoff: CooldownBackoff,
}
pub(super) enum ProviderExecutionOutcome {
Return(Response<Body>),
Exhausted(ProviderExecutionState),
}
pub(super) struct ProviderExecutionState {
pub(super) upstream_chain: Vec<String>,
pub(super) route_attempts: Vec<RouteAttemptLog>,
pub(super) last_err: Option<(StatusCode, String)>,
}
#[cfg(test)]
static ROUTE_EXECUTOR_REQUEST_PATH_TEST_INVOCATIONS: AtomicUsize = AtomicUsize::new(0);
#[cfg(test)]
pub(super) fn route_executor_request_path_test_invocations() -> usize {
ROUTE_EXECUTOR_REQUEST_PATH_TEST_INVOCATIONS.load(Ordering::SeqCst)
}
pub(super) fn log_retry_options(service_name: &str, request_id: u64, plan: &RetryPlan) {
let upstream_opt = &plan.upstream;
let provider_opt = &plan.route;
log_retry_trace(serde_json::json!({
"event": "retry_options",
"service": service_name,
"request_id": request_id,
"upstream": {
"max_attempts": upstream_opt.max_attempts,
"base_backoff_ms": upstream_opt.base_backoff_ms,
"max_backoff_ms": upstream_opt.max_backoff_ms,
"jitter_ms": upstream_opt.jitter_ms,
"retry_status_ranges": upstream_opt.retry_status_ranges,
"retry_error_classes": upstream_opt.retry_error_classes,
"strategy": retry_strategy_name(upstream_opt.strategy),
},
"provider": {
"max_attempts": provider_opt.max_attempts,
"base_backoff_ms": provider_opt.base_backoff_ms,
"max_backoff_ms": provider_opt.max_backoff_ms,
"jitter_ms": provider_opt.jitter_ms,
"retry_status_ranges": provider_opt.retry_status_ranges,
"retry_error_classes": provider_opt.retry_error_classes,
"strategy": retry_strategy_name(provider_opt.strategy),
},
"never_status_ranges": plan.never_status_ranges,
"never_error_classes": plan.never_error_classes,
"cloudflare_challenge_cooldown_secs": plan.cloudflare_challenge_cooldown_secs,
"cloudflare_timeout_cooldown_secs": plan.cloudflare_timeout_cooldown_secs,
"transport_cooldown_secs": plan.transport_cooldown_secs,
"cooldown_backoff_factor": plan.cooldown_backoff_factor,
"cooldown_backoff_max_secs": plan.cooldown_backoff_max_secs,
"allow_cross_station_before_first_output": plan.allow_cross_station_before_first_output,
}));
}
pub(super) async fn execute_provider_chain_with_route_executor(
params: ExecuteProviderChainParams<'_>,
) -> ProviderExecutionOutcome {
#[cfg(test)]
ROUTE_EXECUTOR_REQUEST_PATH_TEST_INVOCATIONS.fetch_add(1, Ordering::SeqCst);
let ExecuteProviderChainParams {
proxy,
route_selection,
method,
uri,
client_headers,
client_headers_entries_cache,
client_uri,
start,
started_at_ms,
request_id,
request_body_len,
body_for_upstream,
request_model,
session_binding,
session_override_config,
global_station_override,
override_model,
override_effort,
override_service_tier,
effective_effort,
effective_service_tier,
base_service_tier,
session_id,
cwd,
request_flavor,
request_body_previews,
debug_max,
warn_max,
client_body_debug,
client_body_warn,
plan,
cooldown_backoff,
} = params;
let provider_opt = &plan.route;
match route_selection {
RequestRouteSelection::RouteGraph { template } => {
let executor = RoutePlanExecutor::new(template);
let route_graph_key = template.route_graph_key();
let total_upstreams = template.candidates.len();
let mut runtime = proxy
.state
.route_plan_runtime_state_for_provider_endpoints(proxy.service_name)
.await;
apply_session_route_affinity_to_runtime(
proxy,
session_id,
template,
route_graph_key.as_str(),
&mut runtime,
)
.await;
let mut route_state = RoutePlanAttemptState::default();
let mut upstream_chain: Vec<String> = Vec::new();
let mut route_attempts: Vec<RouteAttemptLog> = Vec::new();
let mut global_attempt: u32 = 0;
let mut last_err: Option<(StatusCode, String)> = None;
if let Some(response) = execute_route_graph_candidates_with_route_executor(
ExecuteRouteGraphExecutorParams {
proxy,
method,
uri,
client_headers,
client_headers_entries_cache,
client_uri,
start,
started_at_ms,
request_id,
request_body_len,
body_for_upstream,
request_model,
session_binding,
session_override_config,
global_station_override,
override_model,
override_effort,
override_service_tier,
effective_effort,
effective_service_tier,
base_service_tier,
session_id,
cwd,
request_flavor,
request_body_previews,
debug_max,
warn_max,
client_body_debug,
client_body_warn,
plan,
route_graph_key: Some(route_graph_key.as_str()),
provider_attempt: 0,
total_upstreams,
cooldown_backoff,
executor: &executor,
runtime: &runtime,
route_state: &mut route_state,
global_attempt: &mut global_attempt,
last_err: &mut last_err,
upstream_chain: &mut upstream_chain,
route_attempts: &mut route_attempts,
},
)
.await
{
return ProviderExecutionOutcome::Return(response);
}
ProviderExecutionOutcome::Exhausted(ProviderExecutionState {
upstream_chain,
route_attempts,
last_err,
})
}
RequestRouteSelection::Legacy { lbs } => {
let legacy_template = compile_legacy_route_plan_template(
proxy.service_name,
lbs.iter().map(|lb| lb.service.as_ref()),
);
let executor = RoutePlanExecutor::new(&legacy_template);
let total_upstreams = lbs
.iter()
.map(|lb| lb.service.upstreams.len())
.sum::<usize>();
let upstream_overrides = proxy
.state
.get_upstream_meta_overrides(proxy.service_name)
.await;
let runtime = route_plan_runtime_state_from_lbs_with_overrides(
proxy.service_name,
lbs,
&upstream_overrides,
);
let mut route_state = RoutePlanAttemptState::default();
let mut upstream_chain: Vec<String> = Vec::new();
let mut route_attempts: Vec<RouteAttemptLog> = Vec::new();
let strict_multi_config = lbs.len() > 1;
let cross_station_failover_enabled =
cross_station_failover_enabled(strict_multi_config, plan, provider_opt);
let provider_attempt_limit =
provider_attempt_limit(cross_station_failover_enabled, provider_opt.max_attempts);
let mut global_attempt: u32 = 0;
let mut last_err: Option<(StatusCode, String)> = None;
let mut tried_stations: HashSet<String> = HashSet::new();
for provider_attempt in 0..provider_attempt_limit {
let Some(lb) = next_provider_load_balancer(lbs, &tried_stations) else {
break;
};
let station_name = lb.service.name.clone();
if let Some(response) = execute_station_upstreams_with_route_executor(
ExecuteRouteExecutorStationParams {
proxy,
lb: &lb,
station_name: station_name.as_str(),
method,
uri,
client_headers,
client_headers_entries_cache,
client_uri,
start,
started_at_ms,
request_id,
request_body_len,
body_for_upstream,
request_model,
session_binding,
session_override_config,
global_station_override,
override_model,
override_effort,
override_service_tier,
effective_effort,
effective_service_tier,
base_service_tier,
session_id,
cwd,
request_flavor,
request_body_previews,
debug_max,
warn_max,
client_body_debug,
client_body_warn,
plan,
route_graph_key: None,
provider_attempt,
total_upstreams,
cooldown_backoff,
executor: &executor,
runtime: &runtime,
route_state: &mut route_state,
global_attempt: &mut global_attempt,
last_err: &mut last_err,
upstream_chain: &mut upstream_chain,
route_attempts: &mut route_attempts,
},
)
.await
{
return ProviderExecutionOutcome::Return(response);
}
tried_stations.insert(station_name.clone());
log_cross_station_failover_blocked(CrossStationFailoverBlockedParams {
service_name: proxy.service_name,
request_id,
station_name: station_name.as_str(),
strict_multi_config,
provider_attempt,
cross_station_failover_enabled,
provider_opt,
provider_attempt_limit,
allow_cross_station_before_first_output: plan
.allow_cross_station_before_first_output,
});
if provider_opt.base_backoff_ms > 0 && provider_attempt + 1 < provider_attempt_limit
{
backoff_sleep(provider_opt, provider_attempt).await;
}
}
ProviderExecutionOutcome::Exhausted(ProviderExecutionState {
upstream_chain,
route_attempts,
last_err,
})
}
}
}
struct ExecuteRouteExecutorStationParams<'a, 'route> {
proxy: &'a ProxyService,
lb: &'a LoadBalancer,
station_name: &'a str,
method: &'a Method,
uri: &'a Uri,
client_headers: &'a HeaderMap,
client_headers_entries_cache: &'a OnceLock<Vec<HeaderEntry>>,
client_uri: &'a str,
start: &'a Instant,
started_at_ms: u64,
request_id: u64,
request_body_len: usize,
body_for_upstream: &'a Bytes,
request_model: Option<&'a str>,
session_binding: Option<&'a SessionBinding>,
session_override_config: Option<&'a str>,
global_station_override: Option<&'a str>,
override_model: Option<&'a str>,
override_effort: Option<&'a str>,
override_service_tier: Option<&'a str>,
effective_effort: Option<&'a str>,
effective_service_tier: Option<&'a str>,
base_service_tier: &'a ServiceTierLog,
session_id: Option<&'a str>,
cwd: Option<&'a str>,
request_flavor: &'a RequestFlavor,
request_body_previews: bool,
debug_max: usize,
warn_max: usize,
client_body_debug: Option<&'a BodyPreview>,
client_body_warn: Option<&'a BodyPreview>,
plan: &'a RetryPlan,
route_graph_key: Option<&'a str>,
provider_attempt: u32,
total_upstreams: usize,
cooldown_backoff: CooldownBackoff,
executor: &'a RoutePlanExecutor<'route>,
runtime: &'a RoutePlanRuntimeState,
route_state: &'a mut RoutePlanAttemptState,
global_attempt: &'a mut u32,
last_err: &'a mut Option<(StatusCode, String)>,
upstream_chain: &'a mut Vec<String>,
route_attempts: &'a mut Vec<RouteAttemptLog>,
}
struct ExecuteRouteGraphExecutorParams<'a, 'route> {
proxy: &'a ProxyService,
method: &'a Method,
uri: &'a Uri,
client_headers: &'a HeaderMap,
client_headers_entries_cache: &'a OnceLock<Vec<HeaderEntry>>,
client_uri: &'a str,
start: &'a Instant,
started_at_ms: u64,
request_id: u64,
request_body_len: usize,
body_for_upstream: &'a Bytes,
request_model: Option<&'a str>,
session_binding: Option<&'a SessionBinding>,
session_override_config: Option<&'a str>,
global_station_override: Option<&'a str>,
override_model: Option<&'a str>,
override_effort: Option<&'a str>,
override_service_tier: Option<&'a str>,
effective_effort: Option<&'a str>,
effective_service_tier: Option<&'a str>,
base_service_tier: &'a ServiceTierLog,
session_id: Option<&'a str>,
cwd: Option<&'a str>,
request_flavor: &'a RequestFlavor,
request_body_previews: bool,
debug_max: usize,
warn_max: usize,
client_body_debug: Option<&'a BodyPreview>,
client_body_warn: Option<&'a BodyPreview>,
plan: &'a RetryPlan,
route_graph_key: Option<&'a str>,
provider_attempt: u32,
total_upstreams: usize,
cooldown_backoff: CooldownBackoff,
executor: &'a RoutePlanExecutor<'route>,
runtime: &'a RoutePlanRuntimeState,
route_state: &'a mut RoutePlanAttemptState,
global_attempt: &'a mut u32,
last_err: &'a mut Option<(StatusCode, String)>,
upstream_chain: &'a mut Vec<String>,
route_attempts: &'a mut Vec<RouteAttemptLog>,
}
async fn execute_route_graph_candidates_with_route_executor(
params: ExecuteRouteGraphExecutorParams<'_, '_>,
) -> Option<Response<Body>> {
let ExecuteRouteGraphExecutorParams {
proxy,
method,
uri,
client_headers,
client_headers_entries_cache,
client_uri,
start,
started_at_ms,
request_id,
request_body_len,
body_for_upstream,
request_model,
session_binding,
session_override_config,
global_station_override,
override_model,
override_effort,
override_service_tier,
effective_effort,
effective_service_tier,
base_service_tier,
session_id,
cwd,
request_flavor,
request_body_previews,
debug_max,
warn_max,
client_body_debug,
client_body_warn,
plan,
route_graph_key,
provider_attempt,
total_upstreams,
cooldown_backoff,
executor,
runtime,
route_state,
global_attempt,
last_err,
upstream_chain,
route_attempts,
} = params;
loop {
let selection = executor.select_supported_candidate_with_runtime_state(
route_state,
runtime,
request_model,
);
record_executor_unsupported_model_skips(
proxy.service_name,
upstream_chain,
route_attempts,
&selection.skipped,
provider_attempt,
plan.route.max_attempts,
);
let avoided_candidate_indices = selection.avoided_candidate_indices.clone();
let mut avoided_total = selection.avoided_total;
let Some(selected) = selection.selected else {
break;
};
log_route_graph_selection_explain(
proxy.service_name,
request_id,
executor,
runtime,
route_state,
request_model,
&selected,
);
let selected_candidate = selected.candidate;
let mut avoid_set = hash_set_from_indices(&avoided_candidate_indices);
let target = AttemptTarget::from_candidate(proxy.service_name, selected_candidate);
match execute_selected_upstream(ExecuteSelectedUpstreamParams {
proxy,
legacy_lb: None,
target: &target,
method,
uri,
client_headers,
client_headers_entries_cache,
client_uri,
start,
started_at_ms,
request_id,
request_body_len,
body_for_upstream,
request_model,
session_binding,
session_override_config,
global_station_override,
override_model,
override_effort,
override_service_tier,
effective_effort,
effective_service_tier,
base_service_tier,
session_id,
cwd,
request_flavor,
request_body_previews,
debug_max,
warn_max,
client_body_debug,
client_body_warn,
plan,
route_graph_key,
upstream_opt: &plan.upstream,
provider_opt: &plan.route,
provider_attempt,
total_upstreams,
cooldown_backoff,
global_attempt,
avoid_set: &mut avoid_set,
avoided_total: &mut avoided_total,
last_err,
upstream_chain,
route_attempts,
})
.await
{
SelectedUpstreamExecutionOutcome::ContinueStation => {}
SelectedUpstreamExecutionOutcome::Return(response) => return Some(response),
}
if avoid_set.contains(&selected_candidate.stable_index) {
route_state.avoid_candidate(executor.template(), selected_candidate);
}
debug_assert_eq!(route_state.avoided_total(), avoided_total);
}
None
}
async fn execute_station_upstreams_with_route_executor(
params: ExecuteRouteExecutorStationParams<'_, '_>,
) -> Option<Response<Body>> {
let ExecuteRouteExecutorStationParams {
proxy,
lb,
station_name,
method,
uri,
client_headers,
client_headers_entries_cache,
client_uri,
start,
started_at_ms,
request_id,
request_body_len,
body_for_upstream,
request_model,
session_binding,
session_override_config,
global_station_override,
override_model,
override_effort,
override_service_tier,
effective_effort,
effective_service_tier,
base_service_tier,
session_id,
cwd,
request_flavor,
request_body_previews,
debug_max,
warn_max,
client_body_debug,
client_body_warn,
plan,
route_graph_key,
provider_attempt,
total_upstreams,
cooldown_backoff,
executor,
runtime,
route_state,
global_attempt,
last_err,
upstream_chain,
route_attempts,
} = params;
'upstreams: loop {
let upstream_total = lb.service.upstreams.len();
let avoid_snapshot =
hash_set_from_indices(&route_state.avoid_for_station_name(station_name));
if station_upstreams_exhausted(upstream_total, &avoid_snapshot) {
log_same_station_failover_trace(
proxy.service_name,
request_id,
station_name,
upstream_total,
&avoid_snapshot,
true,
);
break 'upstreams;
}
let selection = executor.select_supported_station_candidate_with_runtime_state(
route_state,
runtime,
station_name,
request_model,
);
record_executor_station_unsupported_model_skips(
proxy.service_name,
upstream_chain,
route_attempts,
&selection.skipped,
provider_attempt,
plan.route.max_attempts,
);
let avoid_for_station = selection.avoid_for_station.clone();
let mut avoided_total = selection.avoided_total;
let Some(selected) = selection.selected else {
break 'upstreams;
};
let selected = selected.selected_upstream;
let selected_station_name = selected.station_name.clone();
let mut avoid_set = hash_set_from_indices(&avoid_for_station);
let target = AttemptTarget::legacy(selected.clone());
match execute_selected_upstream(ExecuteSelectedUpstreamParams {
proxy,
legacy_lb: Some(lb),
target: &target,
method,
uri,
client_headers,
client_headers_entries_cache,
client_uri,
start,
started_at_ms,
request_id,
request_body_len,
body_for_upstream,
request_model,
session_binding,
session_override_config,
global_station_override,
override_model,
override_effort,
override_service_tier,
effective_effort,
effective_service_tier,
base_service_tier,
session_id,
cwd,
request_flavor,
request_body_previews,
debug_max,
warn_max,
client_body_debug,
client_body_warn,
plan,
route_graph_key,
upstream_opt: &plan.upstream,
provider_opt: &plan.route,
provider_attempt,
total_upstreams,
cooldown_backoff,
global_attempt,
avoid_set: &mut avoid_set,
avoided_total: &mut avoided_total,
last_err,
upstream_chain,
route_attempts,
})
.await
{
SelectedUpstreamExecutionOutcome::ContinueStation => {}
SelectedUpstreamExecutionOutcome::Return(response) => return Some(response),
}
sync_route_state_from_avoid_set(route_state, selected_station_name.as_str(), &avoid_set);
debug_assert_eq!(route_state.avoided_total(), avoided_total);
if station_loop_action_after_attempt(
proxy.service_name,
request_id,
selected_station_name.as_str(),
lb.service.upstreams.len(),
&avoid_set,
) {
break 'upstreams;
}
}
None
}
fn log_route_graph_selection_explain(
service_name: &str,
request_id: u64,
executor: &RoutePlanExecutor<'_>,
runtime: &RoutePlanRuntimeState,
route_state: &RoutePlanAttemptState,
request_model: Option<&str>,
selected: &SelectedRouteCandidate<'_>,
) {
let selected_group = selected.candidate.preference_group;
if selected_group == 0 {
return;
}
let template = executor.template();
let selected_provider_endpoint_key = selected.provider_endpoint.stable_key();
let runtime_reason_map = executor
.explain_candidate_skip_reasons_with_runtime_state(runtime, request_model)
.into_iter()
.map(|skip| {
let reasons = skip
.reasons
.iter()
.map(RoutePlanSkipReason::code)
.collect::<Vec<_>>();
(skip.provider_endpoint.stable_key(), reasons)
})
.collect::<BTreeMap<_, _>>();
let mut skipped_groups = BTreeSet::new();
let mut skipped_candidates = Vec::new();
for candidate in executor.iter_candidates() {
if candidate.preference_group >= selected_group {
continue;
}
let provider_endpoint_key = template
.candidate_provider_endpoint_key(candidate)
.stable_key();
let mut reasons = BTreeSet::new();
if route_state.avoids_candidate(template, candidate) {
reasons.insert("attempt_avoided");
}
if let Some(runtime_reasons) = runtime_reason_map.get(provider_endpoint_key.as_str()) {
reasons.extend(runtime_reasons.iter().copied());
}
if reasons.is_empty() {
reasons.insert("not_selected");
}
skipped_groups.insert(candidate.preference_group);
skipped_candidates.push(serde_json::json!({
"provider_id": candidate.provider_id.as_str(),
"endpoint_id": candidate.endpoint_id.as_str(),
"provider_endpoint_key": provider_endpoint_key,
"preference_group": candidate.preference_group,
"route_path": &candidate.route_path,
"reasons": reasons.into_iter().collect::<Vec<_>>(),
}));
}
if skipped_candidates.is_empty() {
return;
}
let affinity_provider_endpoint_key = runtime
.affinity_provider_endpoint()
.map(|key| key.stable_key());
let selected_matches_affinity = affinity_provider_endpoint_key
.as_deref()
.is_some_and(|key| key == selected_provider_endpoint_key);
log_retry_trace(serde_json::json!({
"event": "route_graph_selection_explain",
"service": service_name,
"request_id": request_id,
"request_model": request_model,
"affinity": {
"policy": routing_affinity_policy_trace_label(template.affinity_policy),
"provider_endpoint_key": affinity_provider_endpoint_key,
"selected_matches_affinity": selected_matches_affinity,
},
"selected": {
"provider_id": selected.candidate.provider_id.as_str(),
"endpoint_id": selected.candidate.endpoint_id.as_str(),
"provider_endpoint_key": selected_provider_endpoint_key,
"preference_group": selected_group,
"route_path": &selected.candidate.route_path,
},
"skipped_higher_priority_groups": skipped_groups.into_iter().collect::<Vec<_>>(),
"skipped_higher_priority_candidates": skipped_candidates,
}));
}
fn routing_affinity_policy_trace_label(policy: RoutingAffinityPolicyV5) -> &'static str {
match policy {
RoutingAffinityPolicyV5::Off => "off",
RoutingAffinityPolicyV5::PreferredGroup => "preferred_group",
RoutingAffinityPolicyV5::FallbackSticky => "fallback_sticky",
RoutingAffinityPolicyV5::Hard => "hard",
}
}
fn record_executor_unsupported_model_skips(
service_name: &str,
upstream_chain: &mut Vec<String>,
route_attempts: &mut Vec<RouteAttemptLog>,
skipped: &[SkippedRouteCandidate<'_>],
provider_attempt: u32,
provider_max_attempts: u32,
) {
for skipped in skipped {
let RoutePlanSkipReason::UnsupportedModel { requested_model } = &skipped.reason else {
continue;
};
let avoid_set = hash_set_from_indices(&skipped.avoided_candidate_indices);
let target = AttemptTarget::from_candidate(service_name, skipped.candidate);
record_unsupported_model_skip(
upstream_chain,
route_attempts,
UnsupportedModelSkipParams {
target: &target,
requested_model,
provider_attempt,
provider_max_attempts,
avoid_set: &avoid_set,
avoided_total: skipped.avoided_total,
total_upstreams: skipped.total_upstreams,
},
);
}
}
fn record_executor_station_unsupported_model_skips(
_service_name: &str,
upstream_chain: &mut Vec<String>,
route_attempts: &mut Vec<RouteAttemptLog>,
skipped: &[SkippedStationRouteCandidate<'_>],
provider_attempt: u32,
provider_max_attempts: u32,
) {
for skipped in skipped {
let RoutePlanSkipReason::UnsupportedModel { requested_model } = &skipped.reason else {
continue;
};
let avoid_set = hash_set_from_indices(&skipped.avoid_for_station);
let target = AttemptTarget::legacy(skipped.selected_upstream.clone());
record_unsupported_model_skip(
upstream_chain,
route_attempts,
UnsupportedModelSkipParams {
target: &target,
requested_model,
provider_attempt,
provider_max_attempts,
avoid_set: &avoid_set,
avoided_total: skipped.avoided_total,
total_upstreams: skipped.total_upstreams,
},
);
}
}
fn sync_route_state_from_avoid_set(
route_state: &mut RoutePlanAttemptState,
station_name: &str,
avoid_set: &HashSet<usize>,
) {
for index in avoid_set {
route_state.avoid_upstream(station_name, *index);
}
}
fn hash_set_from_indices(indices: &[usize]) -> HashSet<usize> {
indices.iter().copied().collect()
}
fn retry_strategy_name(strategy: RetryStrategy) -> &'static str {
if strategy == RetryStrategy::Failover {
"failover"
} else {
"same_upstream"
}
}