use super::*;
pub(crate) fn runtime_proxy_current_profile(shared: &RuntimeRotationProxyShared) -> Result<String> {
Ok(shared
.runtime
.lock()
.map_err(|_| anyhow::anyhow!("runtime auto-rotate state is poisoned"))?
.current_profile
.clone())
}
pub(crate) fn runtime_profile_in_retry_backoff(
runtime: &RuntimeRotationState,
profile_name: &str,
now: i64,
) -> bool {
runtime
.profile_retry_backoff_until
.get(profile_name)
.copied()
.is_some_and(|until| until > now)
}
pub(crate) fn runtime_profile_in_transport_backoff(
runtime: &RuntimeRotationState,
profile_name: &str,
route_kind: RuntimeRouteKind,
now: i64,
) -> bool {
runtime_profile_transport_backoff_until_from_map(
&runtime.profile_transport_backoff_until,
profile_name,
route_kind,
now,
)
.is_some()
}
pub(crate) fn runtime_profile_inflight_count(
runtime: &RuntimeRotationState,
profile_name: &str,
) -> usize {
runtime
.profile_inflight
.get(profile_name)
.copied()
.unwrap_or(0)
}
pub(crate) fn runtime_profile_inflight_hard_limit_context(context: &str) -> usize {
runtime_profile_inflight_weight(context)
}
pub(crate) fn runtime_profile_inflight_hard_limited_for_context(
shared: &RuntimeRotationProxyShared,
profile_name: &str,
context: &str,
) -> Result<bool> {
let hard_limit = runtime_proxy_profile_inflight_hard_limit();
let runtime = shared
.runtime
.lock()
.map_err(|_| anyhow::anyhow!("runtime auto-rotate state is poisoned"))?;
Ok(runtime_profile_inflight_count(&runtime, profile_name)
.saturating_add(runtime_profile_inflight_hard_limit_context(context))
> hard_limit)
}
pub(crate) fn runtime_profile_in_selection_backoff(
runtime: &RuntimeRotationState,
profile_name: &str,
route_kind: RuntimeRouteKind,
now: i64,
) -> bool {
runtime_profile_in_retry_backoff(runtime, profile_name, now)
|| runtime_profile_in_transport_backoff(runtime, profile_name, route_kind, now)
}
pub(crate) fn runtime_route_kind_label(route_kind: RuntimeRouteKind) -> &'static str {
match route_kind {
RuntimeRouteKind::Responses => "responses",
RuntimeRouteKind::Compact => "compact",
RuntimeRouteKind::Websocket => "websocket",
RuntimeRouteKind::Standard => "standard",
}
}
pub(crate) fn runtime_route_coupled_kinds(
route_kind: RuntimeRouteKind,
) -> &'static [RuntimeRouteKind] {
match route_kind {
RuntimeRouteKind::Responses => &[RuntimeRouteKind::Websocket],
RuntimeRouteKind::Websocket => &[RuntimeRouteKind::Responses],
RuntimeRouteKind::Compact => &[RuntimeRouteKind::Standard],
RuntimeRouteKind::Standard => &[RuntimeRouteKind::Compact],
}
}
pub(crate) fn runtime_profile_effective_health_score(
entry: &RuntimeProfileHealth,
now: i64,
) -> u32 {
runtime_profile_effective_score(entry, now, RUNTIME_PROFILE_HEALTH_DECAY_SECONDS)
}
pub(crate) fn runtime_profile_effective_score(
entry: &RuntimeProfileHealth,
now: i64,
decay_seconds: i64,
) -> u32 {
let decay = now
.saturating_sub(entry.updated_at)
.saturating_div(decay_seconds.max(1))
.clamp(0, i64::from(u32::MAX)) as u32;
entry.score.saturating_sub(decay)
}
pub(crate) fn runtime_profile_effective_health_score_from_map(
profile_health: &BTreeMap<String, RuntimeProfileHealth>,
key: &str,
now: i64,
) -> u32 {
profile_health
.get(key)
.map(|entry| runtime_profile_effective_health_score(entry, now))
.unwrap_or(0)
}
pub(crate) fn runtime_profile_effective_score_from_map(
profile_health: &BTreeMap<String, RuntimeProfileHealth>,
key: &str,
now: i64,
decay_seconds: i64,
) -> u32 {
profile_health
.get(key)
.map(|entry| runtime_profile_effective_score(entry, now, decay_seconds))
.unwrap_or(0)
}
pub(crate) fn runtime_profile_global_health_score(
runtime: &RuntimeRotationState,
profile_name: &str,
now: i64,
) -> u32 {
runtime_profile_effective_health_score_from_map(&runtime.profile_health, profile_name, now)
}
pub(crate) fn runtime_profile_route_health_key(
profile_name: &str,
route_kind: RuntimeRouteKind,
) -> String {
format!(
"__route_health__:{}:{profile_name}",
runtime_route_kind_label(route_kind)
)
}
pub(crate) fn runtime_profile_route_bad_pairing_key(
profile_name: &str,
route_kind: RuntimeRouteKind,
) -> String {
format!(
"__route_bad_pairing__:{}:{profile_name}",
runtime_route_kind_label(route_kind)
)
}
pub(crate) fn runtime_profile_route_success_streak_key(
profile_name: &str,
route_kind: RuntimeRouteKind,
) -> String {
format!(
"__route_success__:{}:{profile_name}",
runtime_route_kind_label(route_kind)
)
}
pub(crate) fn runtime_profile_route_performance_key(
profile_name: &str,
route_kind: RuntimeRouteKind,
) -> String {
format!(
"__route_performance__:{}:{profile_name}",
runtime_route_kind_label(route_kind)
)
}
pub(crate) fn runtime_profile_route_health_score(
runtime: &RuntimeRotationState,
profile_name: &str,
now: i64,
route_kind: RuntimeRouteKind,
) -> u32 {
runtime_profile_effective_health_score_from_map(
&runtime.profile_health,
&runtime_profile_route_health_key(profile_name, route_kind),
now,
)
}
pub(crate) fn runtime_profile_route_coupling_score(
runtime: &RuntimeRotationState,
profile_name: &str,
now: i64,
route_kind: RuntimeRouteKind,
) -> u32 {
runtime_profile_route_coupling_score_from_map(
&runtime.profile_health,
profile_name,
now,
route_kind,
)
}
pub(crate) fn runtime_profile_route_coupling_score_from_map(
profile_health: &BTreeMap<String, RuntimeProfileHealth>,
profile_name: &str,
now: i64,
route_kind: RuntimeRouteKind,
) -> u32 {
runtime_route_coupled_kinds(route_kind)
.iter()
.copied()
.map(|coupled_kind| {
let route_score = runtime_profile_effective_health_score_from_map(
profile_health,
&runtime_profile_route_health_key(profile_name, coupled_kind),
now,
);
let bad_pairing_score = runtime_profile_effective_score_from_map(
profile_health,
&runtime_profile_route_bad_pairing_key(profile_name, coupled_kind),
now,
RUNTIME_PROFILE_BAD_PAIRING_DECAY_SECONDS,
);
route_score
.saturating_add(bad_pairing_score)
.saturating_div(2)
})
.fold(0, u32::saturating_add)
}
pub(crate) fn runtime_profile_route_performance_score(
profile_health: &BTreeMap<String, RuntimeProfileHealth>,
profile_name: &str,
now: i64,
route_kind: RuntimeRouteKind,
) -> u32 {
let route_score = runtime_profile_effective_score_from_map(
profile_health,
&runtime_profile_route_performance_key(profile_name, route_kind),
now,
RUNTIME_PROFILE_PERFORMANCE_DECAY_SECONDS,
);
let coupled_score = runtime_route_coupled_kinds(route_kind)
.iter()
.copied()
.map(|coupled_kind| {
runtime_profile_effective_score_from_map(
profile_health,
&runtime_profile_route_performance_key(profile_name, coupled_kind),
now,
RUNTIME_PROFILE_PERFORMANCE_DECAY_SECONDS,
)
.saturating_div(2)
})
.fold(0, u32::saturating_add);
route_score.saturating_add(coupled_score)
}
pub(crate) fn runtime_profile_health_score(
runtime: &RuntimeRotationState,
profile_name: &str,
now: i64,
route_kind: RuntimeRouteKind,
) -> u32 {
runtime_profile_global_health_score(runtime, profile_name, now)
.saturating_add(runtime_profile_route_health_score(
runtime,
profile_name,
now,
route_kind,
))
.saturating_add(runtime_profile_route_coupling_score(
runtime,
profile_name,
now,
route_kind,
))
}
pub(crate) fn runtime_profile_selection_jitter(
shared: &RuntimeRotationProxyShared,
profile_name: &str,
route_kind: RuntimeRouteKind,
) -> u64 {
let mut hasher = DefaultHasher::new();
shared
.request_sequence
.load(Ordering::Relaxed)
.hash(&mut hasher);
profile_name.hash(&mut hasher);
runtime_route_kind_label(route_kind).hash(&mut hasher);
hasher.finish()
}
pub(crate) fn prune_runtime_profile_retry_backoff(runtime: &mut RuntimeRotationState, now: i64) {
runtime
.profile_retry_backoff_until
.retain(|_, until| *until > now);
}
pub(crate) fn prune_runtime_profile_transport_backoff(
runtime: &mut RuntimeRotationState,
now: i64,
) {
runtime
.profile_transport_backoff_until
.retain(|_, until| *until > now);
}
pub(crate) fn prune_runtime_profile_route_circuits(runtime: &mut RuntimeRotationState, now: i64) {
runtime
.profile_route_circuit_open_until
.retain(|key, until| {
if *until > now {
return true;
}
let health_key = runtime_profile_route_circuit_health_key(key);
runtime_profile_effective_health_score_from_map(
&runtime.profile_health,
&health_key,
now,
) > 0
});
}
pub(crate) fn prune_runtime_profile_selection_backoff(
runtime: &mut RuntimeRotationState,
now: i64,
) {
prune_runtime_profile_retry_backoff(runtime, now);
prune_runtime_profile_transport_backoff(runtime, now);
prune_runtime_profile_route_circuits(runtime, now);
}
pub(crate) fn runtime_profile_name_in_selection_backoff(
profile_name: &str,
retry_backoff_until: &BTreeMap<String, i64>,
transport_backoff_until: &BTreeMap<String, i64>,
route_circuit_open_until: &BTreeMap<String, i64>,
route_kind: RuntimeRouteKind,
now: i64,
) -> bool {
retry_backoff_until
.get(profile_name)
.copied()
.is_some_and(|until| until > now)
|| runtime_profile_transport_backoff_until_from_map(
transport_backoff_until,
profile_name,
route_kind,
now,
)
.is_some()
|| route_circuit_open_until
.get(&runtime_profile_route_circuit_key(profile_name, route_kind))
.copied()
.is_some_and(|until| until > now)
}
pub(crate) fn runtime_profile_backoff_sort_key(
profile_name: &str,
retry_backoff_until: &BTreeMap<String, i64>,
transport_backoff_until: &BTreeMap<String, i64>,
route_circuit_open_until: &BTreeMap<String, i64>,
route_kind: RuntimeRouteKind,
now: i64,
) -> (usize, i64, i64, i64) {
let retry_until = retry_backoff_until
.get(profile_name)
.copied()
.filter(|until| *until > now);
let transport_until = runtime_profile_transport_backoff_until_from_map(
transport_backoff_until,
profile_name,
route_kind,
now,
);
let circuit_until = route_circuit_open_until
.get(&runtime_profile_route_circuit_key(profile_name, route_kind))
.copied()
.filter(|until| *until > now);
match (circuit_until, transport_until, retry_until) {
(None, None, None) => (0, 0, 0, 0),
(Some(circuit_until), None, None) => (1, circuit_until, 0, 0),
(None, Some(transport_until), None) => (2, transport_until, 0, 0),
(None, None, Some(retry_until)) => (3, retry_until, 0, 0),
(Some(circuit_until), Some(transport_until), None) => (
4,
circuit_until.min(transport_until),
circuit_until.max(transport_until),
0,
),
(Some(circuit_until), None, Some(retry_until)) => (
5,
circuit_until.min(retry_until),
circuit_until.max(retry_until),
0,
),
(None, Some(transport_until), Some(retry_until)) => (
6,
transport_until.min(retry_until),
transport_until.max(retry_until),
0,
),
(Some(circuit_until), Some(transport_until), Some(retry_until)) => (
7,
circuit_until.min(transport_until.min(retry_until)),
circuit_until.max(transport_until.max(retry_until)),
retry_until,
),
}
}
pub(crate) fn runtime_profile_health_sort_key(
profile_name: &str,
profile_health: &BTreeMap<String, RuntimeProfileHealth>,
now: i64,
route_kind: RuntimeRouteKind,
) -> u32 {
runtime_profile_effective_health_score_from_map(profile_health, profile_name, now)
.saturating_add(runtime_profile_effective_health_score_from_map(
profile_health,
&runtime_profile_route_health_key(profile_name, route_kind),
now,
))
.saturating_add(runtime_profile_effective_score_from_map(
profile_health,
&runtime_profile_route_bad_pairing_key(profile_name, route_kind),
now,
RUNTIME_PROFILE_BAD_PAIRING_DECAY_SECONDS,
))
.saturating_add(runtime_profile_route_coupling_score_from_map(
profile_health,
profile_name,
now,
route_kind,
))
.saturating_add(runtime_profile_route_performance_score(
profile_health,
profile_name,
now,
route_kind,
))
}
pub(crate) fn runtime_profile_inflight_sort_key(
profile_name: &str,
profile_inflight: &BTreeMap<String, usize>,
) -> usize {
profile_inflight.get(profile_name).copied().unwrap_or(0)
}
pub(crate) fn runtime_profile_inflight_weight(context: &str) -> usize {
match context {
"websocket_session" | "responses_http" => 2,
_ => 1,
}
}
pub(crate) fn runtime_route_kind_inflight_context(route_kind: RuntimeRouteKind) -> &'static str {
match route_kind {
RuntimeRouteKind::Responses => "responses_http",
RuntimeRouteKind::Compact => "compact_http",
RuntimeRouteKind::Websocket => "websocket_session",
RuntimeRouteKind::Standard => "standard_http",
}
}
pub(crate) fn runtime_profile_inflight_soft_limit(
route_kind: RuntimeRouteKind,
pressure_mode: bool,
) -> usize {
let base = runtime_proxy_profile_inflight_soft_limit().max(1);
if !pressure_mode {
return base;
}
match route_kind {
RuntimeRouteKind::Responses | RuntimeRouteKind::Websocket => base.saturating_sub(1).max(1),
RuntimeRouteKind::Compact | RuntimeRouteKind::Standard => base.saturating_sub(2).max(1),
}
}
pub(crate) fn runtime_profile_backoffs_snapshot(
runtime: &RuntimeRotationState,
) -> RuntimeProfileBackoffs {
RuntimeProfileBackoffs {
retry_backoff_until: runtime.profile_retry_backoff_until.clone(),
transport_backoff_until: runtime.profile_transport_backoff_until.clone(),
route_circuit_open_until: runtime.profile_route_circuit_open_until.clone(),
}
}
pub(crate) fn runtime_soften_persisted_backoff_map_for_startup(
backoffs: &mut BTreeMap<String, i64>,
now: i64,
max_future_seconds: i64,
) -> bool {
let max_until = now.saturating_add(max_future_seconds.max(0));
let mut changed = false;
backoffs.retain(|_, until| {
if *until <= now {
changed = true;
return false;
}
let next_until = (*until).min(max_until);
if next_until != *until {
changed = true;
}
*until = next_until;
true
});
changed
}
pub(crate) fn runtime_route_kind_from_label(label: &str) -> Option<RuntimeRouteKind> {
match label {
"responses" => Some(RuntimeRouteKind::Responses),
"compact" => Some(RuntimeRouteKind::Compact),
"websocket" => Some(RuntimeRouteKind::Websocket),
"standard" => Some(RuntimeRouteKind::Standard),
_ => None,
}
}
pub(crate) fn runtime_profile_route_circuit_probe_seconds(
profile_scores: &BTreeMap<String, RuntimeProfileHealth>,
route_profile_key: &str,
now: i64,
) -> i64 {
let Some((route_label, profile_name)) =
runtime_profile_route_key_parts(route_profile_key, "__route_circuit__:")
else {
return RUNTIME_PROFILE_CIRCUIT_HALF_OPEN_PROBE_SECONDS;
};
let Some(route_kind) = runtime_route_kind_from_label(route_label) else {
return RUNTIME_PROFILE_CIRCUIT_HALF_OPEN_PROBE_SECONDS;
};
let score = runtime_profile_effective_health_score_from_map(
profile_scores,
&runtime_profile_route_health_key(profile_name, route_kind),
now,
);
runtime_profile_circuit_half_open_probe_seconds(score)
}
pub(crate) fn runtime_soften_persisted_route_circuits_for_startup(
route_circuit_open_until: &mut BTreeMap<String, i64>,
profile_scores: &BTreeMap<String, RuntimeProfileHealth>,
now: i64,
) -> bool {
let mut changed = false;
route_circuit_open_until.retain(|route_profile_key, until| {
if *until <= now {
changed = true;
return false;
}
let max_until = now.saturating_add(runtime_profile_route_circuit_probe_seconds(
profile_scores,
route_profile_key,
now,
));
let next_until = (*until).min(max_until);
if next_until != *until {
changed = true;
}
*until = next_until;
true
});
changed
}
pub(crate) fn runtime_soften_persisted_backoffs_for_startup(
backoffs: &mut RuntimeProfileBackoffs,
profile_scores: &BTreeMap<String, RuntimeProfileHealth>,
now: i64,
) -> bool {
let mut changed = runtime_soften_persisted_backoff_map_for_startup(
&mut backoffs.transport_backoff_until,
now,
RUNTIME_PROFILE_TRANSPORT_BACKOFF_SECONDS,
);
changed = runtime_soften_persisted_route_circuits_for_startup(
&mut backoffs.route_circuit_open_until,
profile_scores,
now,
) || changed;
changed
}
pub(crate) const RUNTIME_PROFILE_CIRCUIT_OPEN_THRESHOLD: u32 = 4;
pub(crate) const RUNTIME_PROFILE_CIRCUIT_OPEN_SECONDS: i64 = 20;
pub(crate) const RUNTIME_PROFILE_CIRCUIT_OPEN_MAX_SECONDS: i64 = if cfg!(test) { 320 } else { 600 };
pub(crate) const RUNTIME_PROFILE_CIRCUIT_HALF_OPEN_PROBE_SECONDS: i64 = 5;
pub(crate) const RUNTIME_PROFILE_CIRCUIT_HALF_OPEN_PROBE_MAX_SECONDS: i64 =
if cfg!(test) { 20 } else { 60 };
pub(crate) const RUNTIME_PROFILE_CIRCUIT_REOPEN_DECAY_SECONDS: i64 =
if cfg!(test) { 12 } else { 1_800 };
pub(crate) const RUNTIME_PROFILE_CIRCUIT_REOPEN_MAX_STAGE: u32 = 4;
pub(crate) fn runtime_profile_circuit_open_seconds(score: u32, reopen_stage: u32) -> i64 {
let multiplier = 1_i64
.checked_shl(
score
.saturating_sub(RUNTIME_PROFILE_CIRCUIT_OPEN_THRESHOLD)
.min(3)
.saturating_add(reopen_stage.min(RUNTIME_PROFILE_CIRCUIT_REOPEN_MAX_STAGE)),
)
.unwrap_or(i64::MAX);
RUNTIME_PROFILE_CIRCUIT_OPEN_SECONDS
.saturating_mul(multiplier)
.min(RUNTIME_PROFILE_CIRCUIT_OPEN_MAX_SECONDS)
}
pub(crate) fn runtime_profile_circuit_half_open_probe_seconds(score: u32) -> i64 {
let multiplier = 1_i64
.checked_shl(
score
.saturating_sub(RUNTIME_PROFILE_CIRCUIT_OPEN_THRESHOLD)
.min(3),
)
.unwrap_or(i64::MAX);
RUNTIME_PROFILE_CIRCUIT_HALF_OPEN_PROBE_SECONDS
.saturating_mul(multiplier)
.min(RUNTIME_PROFILE_CIRCUIT_HALF_OPEN_PROBE_MAX_SECONDS)
}
pub(crate) fn runtime_profile_route_circuit_key(
profile_name: &str,
route_kind: RuntimeRouteKind,
) -> String {
format!(
"__route_circuit__:{}:{profile_name}",
runtime_route_kind_label(route_kind)
)
}
pub(crate) fn runtime_profile_route_circuit_profile_name(key: &str) -> &str {
key.rsplit(':').next().unwrap_or(key)
}
pub(crate) fn runtime_profile_route_circuit_health_key(key: &str) -> String {
key.replacen("__route_circuit__", "__route_health__", 1)
}
pub(crate) fn runtime_profile_route_circuit_reopen_key(
profile_name: &str,
route_kind: RuntimeRouteKind,
) -> String {
format!(
"__route_circuit_reopen__:{}:{profile_name}",
runtime_route_kind_label(route_kind)
)
}
pub(crate) fn runtime_profile_route_circuit_open_until(
runtime: &RuntimeRotationState,
profile_name: &str,
route_kind: RuntimeRouteKind,
now: i64,
) -> Option<i64> {
runtime
.profile_route_circuit_open_until
.get(&runtime_profile_route_circuit_key(profile_name, route_kind))
.copied()
.filter(|until| *until > now)
}
pub(crate) fn reserve_runtime_profile_route_circuit_half_open_probe(
shared: &RuntimeRotationProxyShared,
profile_name: &str,
route_kind: RuntimeRouteKind,
) -> Result<bool> {
let mut runtime = shared
.runtime
.lock()
.map_err(|_| anyhow::anyhow!("runtime auto-rotate state is poisoned"))?;
let now = Local::now().timestamp();
let key = runtime_profile_route_circuit_key(profile_name, route_kind);
let route_label = runtime_route_kind_label(route_kind);
let Some(until) = runtime.profile_route_circuit_open_until.get(&key).copied() else {
return Ok(true);
};
if until > now {
return Ok(false);
}
let health_key = runtime_profile_route_circuit_health_key(&key);
let reopen_key = runtime_profile_route_circuit_reopen_key(profile_name, route_kind);
let health_score =
runtime_profile_effective_health_score_from_map(&runtime.profile_health, &health_key, now);
if health_score == 0 {
runtime.profile_route_circuit_open_until.remove(&key);
runtime.profile_health.remove(&reopen_key);
schedule_runtime_state_save_from_runtime(
shared,
&runtime,
&format!("profile_circuit_clear:{profile_name}:{route_label}"),
);
return Ok(true);
}
let probe_seconds = runtime_profile_circuit_half_open_probe_seconds(health_score);
let reserve_until = now.saturating_add(probe_seconds);
runtime
.profile_route_circuit_open_until
.insert(key, reserve_until);
schedule_runtime_state_save_from_runtime(
shared,
&runtime,
&format!("profile_circuit_half_open_probe:{profile_name}:{route_label}"),
);
drop(runtime);
runtime_proxy_log(
shared,
format!(
"profile_circuit_half_open_probe profile={profile_name} route={} until={reserve_until} health={health_score} probe_seconds={probe_seconds}",
route_label
),
);
Ok(true)
}
pub(crate) fn clear_runtime_profile_circuit_for_route(
runtime: &mut RuntimeRotationState,
profile_name: &str,
route_kind: RuntimeRouteKind,
) -> bool {
runtime
.profile_route_circuit_open_until
.remove(&runtime_profile_route_circuit_key(profile_name, route_kind))
.is_some()
}
pub(crate) fn runtime_profile_latency_penalty(
elapsed_ms: u64,
route_kind: RuntimeRouteKind,
stage: &str,
) -> u32 {
let (good_ms, warn_ms, poor_ms, severe_ms) = match (route_kind, stage) {
(RuntimeRouteKind::Responses, "ttfb") | (RuntimeRouteKind::Websocket, "connect") => {
(120, 300, 700, 1_500)
}
(RuntimeRouteKind::Compact, _) | (RuntimeRouteKind::Standard, _) => (80, 180, 400, 900),
_ => (100, 250, 600, 1_200),
};
match elapsed_ms {
elapsed if elapsed <= good_ms => 0,
elapsed if elapsed <= warn_ms => 2,
elapsed if elapsed <= poor_ms => 4,
elapsed if elapsed <= severe_ms => 7,
_ => RUNTIME_PROFILE_LATENCY_PENALTY_MAX,
}
}
pub(crate) fn update_runtime_profile_route_performance(
shared: &RuntimeRotationProxyShared,
profile_name: &str,
route_kind: RuntimeRouteKind,
next_score: u32,
reason: &str,
) -> Result<()> {
let mut runtime = shared
.runtime
.lock()
.map_err(|_| anyhow::anyhow!("runtime auto-rotate state is poisoned"))?;
let now = Local::now().timestamp();
let key = runtime_profile_route_performance_key(profile_name, route_kind);
if next_score == 0 {
runtime.profile_health.remove(&key);
} else {
runtime.profile_health.insert(
key,
RuntimeProfileHealth {
score: next_score.min(RUNTIME_PROFILE_LATENCY_PENALTY_MAX),
updated_at: now,
},
);
}
drop(runtime);
runtime_proxy_log(
shared,
format!(
"profile_latency profile={profile_name} route={} score={} reason={reason}",
runtime_route_kind_label(route_kind),
next_score.min(RUNTIME_PROFILE_LATENCY_PENALTY_MAX),
),
);
Ok(())
}
pub(crate) fn note_runtime_profile_latency_observation(
shared: &RuntimeRotationProxyShared,
profile_name: &str,
route_kind: RuntimeRouteKind,
stage: &str,
elapsed_ms: u64,
) {
let current_score = shared
.runtime
.lock()
.ok()
.map(|runtime| {
let now = Local::now().timestamp();
runtime_profile_effective_score_from_map(
&runtime.profile_health,
&runtime_profile_route_performance_key(profile_name, route_kind),
now,
RUNTIME_PROFILE_PERFORMANCE_DECAY_SECONDS,
)
})
.unwrap_or(0);
let observed = runtime_profile_latency_penalty(elapsed_ms, route_kind, stage);
let next_score = if observed == 0 {
current_score.saturating_sub(2)
} else {
(((current_score as u64) * 2) + (observed as u64)).div_ceil(3) as u32
};
let _ = update_runtime_profile_route_performance(
shared,
profile_name,
route_kind,
next_score,
&format!("{stage}_{elapsed_ms}ms"),
);
}
pub(crate) fn note_runtime_profile_latency_failure(
shared: &RuntimeRotationProxyShared,
profile_name: &str,
route_kind: RuntimeRouteKind,
stage: &str,
) {
let current_score = shared
.runtime
.lock()
.ok()
.map(|runtime| {
let now = Local::now().timestamp();
runtime_profile_effective_score_from_map(
&runtime.profile_health,
&runtime_profile_route_performance_key(profile_name, route_kind),
now,
RUNTIME_PROFILE_PERFORMANCE_DECAY_SECONDS,
)
})
.unwrap_or(0);
let next_score = current_score
.saturating_add(RUNTIME_PROFILE_TRANSPORT_FAILURE_HEALTH_PENALTY)
.min(RUNTIME_PROFILE_LATENCY_PENALTY_MAX);
let _ = update_runtime_profile_route_performance(
shared,
profile_name,
route_kind,
next_score,
stage,
);
}
pub(crate) fn reset_runtime_profile_success_streak(
runtime: &mut RuntimeRotationState,
profile_name: &str,
route_kind: RuntimeRouteKind,
) {
runtime
.profile_health
.remove(&runtime_profile_route_success_streak_key(
profile_name,
route_kind,
));
}
pub(crate) fn bump_runtime_profile_bad_pairing_score(
shared: &RuntimeRotationProxyShared,
profile_name: &str,
route_kind: RuntimeRouteKind,
delta: u32,
reason: &str,
) -> Result<()> {
let mut runtime = shared
.runtime
.lock()
.map_err(|_| anyhow::anyhow!("runtime auto-rotate state is poisoned"))?;
let now = Local::now().timestamp();
let key = runtime_profile_route_bad_pairing_key(profile_name, route_kind);
let next_score = runtime_profile_effective_score_from_map(
&runtime.profile_health,
&key,
now,
RUNTIME_PROFILE_BAD_PAIRING_DECAY_SECONDS,
)
.saturating_add(delta)
.min(RUNTIME_PROFILE_HEALTH_MAX_SCORE);
reset_runtime_profile_success_streak(&mut runtime, profile_name, route_kind);
runtime.profile_health.insert(
key,
RuntimeProfileHealth {
score: next_score,
updated_at: now,
},
);
schedule_runtime_state_save_from_runtime(
shared,
&runtime,
&format!(
"profile_bad_pairing:{profile_name}:{}",
runtime_route_kind_label(route_kind)
),
);
drop(runtime);
runtime_proxy_log(
shared,
format!(
"profile_bad_pairing profile={profile_name} route={} score={next_score} delta={delta} reason={reason}",
runtime_route_kind_label(route_kind)
),
);
Ok(())
}
pub(crate) fn bump_runtime_profile_health_score(
shared: &RuntimeRotationProxyShared,
profile_name: &str,
route_kind: RuntimeRouteKind,
delta: u32,
reason: &str,
) -> Result<()> {
let mut runtime = shared
.runtime
.lock()
.map_err(|_| anyhow::anyhow!("runtime auto-rotate state is poisoned"))?;
let now = Local::now().timestamp();
let key = runtime_profile_route_health_key(profile_name, route_kind);
let next_score = runtime
.profile_health
.get(&key)
.map(|entry| runtime_profile_effective_health_score(entry, now))
.unwrap_or(0)
.saturating_add(delta)
.min(RUNTIME_PROFILE_HEALTH_MAX_SCORE);
reset_runtime_profile_success_streak(&mut runtime, profile_name, route_kind);
runtime.profile_health.insert(
key,
RuntimeProfileHealth {
score: next_score,
updated_at: now,
},
);
let circuit_until = if next_score >= RUNTIME_PROFILE_CIRCUIT_OPEN_THRESHOLD {
let circuit_key = runtime_profile_route_circuit_key(profile_name, route_kind);
let reopen_stage = if runtime
.profile_route_circuit_open_until
.contains_key(&circuit_key)
{
runtime_profile_effective_score_from_map(
&runtime.profile_health,
&runtime_profile_route_circuit_reopen_key(profile_name, route_kind),
now,
RUNTIME_PROFILE_CIRCUIT_REOPEN_DECAY_SECONDS,
)
.saturating_add(1)
.min(RUNTIME_PROFILE_CIRCUIT_REOPEN_MAX_STAGE)
} else {
0
};
if reopen_stage == 0 {
runtime
.profile_health
.remove(&runtime_profile_route_circuit_reopen_key(
profile_name,
route_kind,
));
} else {
runtime.profile_health.insert(
runtime_profile_route_circuit_reopen_key(profile_name, route_kind),
RuntimeProfileHealth {
score: reopen_stage,
updated_at: now,
},
);
}
let until = now.saturating_add(runtime_profile_circuit_open_seconds(
next_score,
reopen_stage,
));
runtime
.profile_route_circuit_open_until
.entry(circuit_key)
.and_modify(|current| *current = (*current).max(until))
.or_insert(until);
Some((until, reopen_stage))
} else {
None
};
schedule_runtime_state_save_from_runtime(
shared,
&runtime,
&format!(
"profile_health:{profile_name}:{}",
runtime_route_kind_label(route_kind)
),
);
drop(runtime);
runtime_proxy_log(
shared,
format!(
"profile_health profile={profile_name} route={} score={next_score} delta={delta} reason={reason}",
runtime_route_kind_label(route_kind)
),
);
if let Some((until, reopen_stage)) = circuit_until {
runtime_proxy_log(
shared,
format!(
"profile_circuit_open profile={profile_name} route={} until={until} reopen_stage={reopen_stage} reason={reason} score={next_score}",
runtime_route_kind_label(route_kind)
),
);
}
Ok(())
}
pub(crate) fn recover_runtime_profile_health_for_route(
runtime: &mut RuntimeRotationState,
profile_name: &str,
route_kind: RuntimeRouteKind,
now: i64,
) {
let key = runtime_profile_route_health_key(profile_name, route_kind);
let streak_key = runtime_profile_route_success_streak_key(profile_name, route_kind);
let Some(current_score) = runtime
.profile_health
.get(&key)
.map(|entry| runtime_profile_effective_health_score(entry, now))
else {
runtime.profile_health.remove(&streak_key);
return;
};
let next_streak = runtime_profile_effective_score_from_map(
&runtime.profile_health,
&streak_key,
now,
RUNTIME_PROFILE_SUCCESS_STREAK_DECAY_SECONDS,
)
.saturating_add(1)
.min(RUNTIME_PROFILE_SUCCESS_STREAK_MAX);
let recovery = RUNTIME_PROFILE_HEALTH_SUCCESS_RECOVERY_SCORE
.saturating_add(next_streak.saturating_sub(1).min(1));
let next_score = current_score.saturating_sub(recovery);
if next_score == 0 {
runtime.profile_health.remove(&key);
runtime.profile_health.remove(&streak_key);
} else {
runtime.profile_health.insert(
key,
RuntimeProfileHealth {
score: next_score,
updated_at: now,
},
);
runtime.profile_health.insert(
streak_key,
RuntimeProfileHealth {
score: next_streak,
updated_at: now,
},
);
}
}
pub(crate) fn mark_runtime_profile_retry_backoff(
shared: &RuntimeRotationProxyShared,
profile_name: &str,
) -> Result<()> {
let mut runtime = shared
.runtime
.lock()
.map_err(|_| anyhow::anyhow!("runtime auto-rotate state is poisoned"))?;
let now = Local::now().timestamp();
prune_runtime_profile_selection_backoff(&mut runtime, now);
runtime.profile_probe_cache.remove(profile_name);
let until = now.saturating_add(RUNTIME_PROFILE_RETRY_BACKOFF_SECONDS);
runtime
.profile_retry_backoff_until
.insert(profile_name.to_string(), until);
schedule_runtime_state_save_from_runtime(
shared,
&runtime,
&format!("profile_retry_backoff:{profile_name}"),
);
drop(runtime);
runtime_proxy_log(
shared,
format!("profile_retry_backoff profile={profile_name} until={until}"),
);
Ok(())
}
pub(crate) fn mark_runtime_profile_transport_backoff(
shared: &RuntimeRotationProxyShared,
profile_name: &str,
route_kind: RuntimeRouteKind,
context: &str,
) -> Result<()> {
let mut runtime = shared
.runtime
.lock()
.map_err(|_| anyhow::anyhow!("runtime auto-rotate state is poisoned"))?;
let now = Local::now().timestamp();
prune_runtime_profile_selection_backoff(&mut runtime, now);
runtime.profile_probe_cache.remove(profile_name);
let route_key = runtime_profile_transport_backoff_key(profile_name, route_kind);
let existing_remaining = runtime_profile_transport_backoff_until_from_map(
&runtime.profile_transport_backoff_until,
profile_name,
route_kind,
now,
)
.unwrap_or(now)
.saturating_sub(now);
let next_backoff_seconds = if existing_remaining > 0 {
existing_remaining.saturating_mul(2).clamp(
RUNTIME_PROFILE_TRANSPORT_BACKOFF_SECONDS,
RUNTIME_PROFILE_TRANSPORT_BACKOFF_MAX_SECONDS,
)
} else {
RUNTIME_PROFILE_TRANSPORT_BACKOFF_SECONDS
};
let until = now.saturating_add(next_backoff_seconds);
runtime
.profile_transport_backoff_until
.entry(route_key)
.and_modify(|current| *current = (*current).max(until))
.or_insert(until);
schedule_runtime_state_save_from_runtime(
shared,
&runtime,
&format!(
"profile_transport_backoff:{profile_name}:{}",
runtime_route_kind_label(route_kind)
),
);
drop(runtime);
runtime_proxy_log(
shared,
format!(
"profile_transport_backoff profile={profile_name} route={} until={until} seconds={next_backoff_seconds} context={context}",
runtime_route_kind_label(route_kind)
),
);
Ok(())
}
pub(crate) fn note_runtime_profile_transport_failure(
shared: &RuntimeRotationProxyShared,
profile_name: &str,
route_kind: RuntimeRouteKind,
context: &str,
err: &anyhow::Error,
) {
let Some(failure_kind) = runtime_proxy_transport_failure_kind(err) else {
return;
};
runtime_proxy_log(
shared,
format!(
"profile_transport_failure profile={profile_name} route={} class={} context={context}",
runtime_route_kind_label(route_kind),
runtime_transport_failure_kind_label(failure_kind),
),
);
let _ = bump_runtime_profile_health_score(
shared,
profile_name,
route_kind,
runtime_profile_transport_health_penalty(failure_kind),
context,
);
let _ = bump_runtime_profile_bad_pairing_score(
shared,
profile_name,
route_kind,
RUNTIME_PROFILE_BAD_PAIRING_PENALTY,
context,
);
note_runtime_profile_latency_failure(shared, profile_name, route_kind, context);
let _ = mark_runtime_profile_transport_backoff(shared, profile_name, route_kind, context);
}
pub(crate) fn clear_runtime_profile_transport_backoff_for_route(
runtime: &mut RuntimeRotationState,
profile_name: &str,
route_kind: RuntimeRouteKind,
) -> bool {
let mut changed = runtime
.profile_transport_backoff_until
.remove(&runtime_profile_transport_backoff_key(
profile_name,
route_kind,
))
.is_some();
changed = runtime
.profile_transport_backoff_until
.remove(profile_name)
.is_some()
|| changed;
changed
}
pub(crate) fn commit_runtime_proxy_profile_selection(
shared: &RuntimeRotationProxyShared,
profile_name: &str,
route_kind: RuntimeRouteKind,
) -> Result<bool> {
commit_runtime_proxy_profile_selection_with_policy(shared, profile_name, route_kind, true)
}
pub(crate) fn commit_runtime_proxy_profile_selection_with_policy(
shared: &RuntimeRotationProxyShared,
profile_name: &str,
route_kind: RuntimeRouteKind,
track_current_profile: bool,
) -> Result<bool> {
let mut runtime = shared
.runtime
.lock()
.map_err(|_| anyhow::anyhow!("runtime auto-rotate state is poisoned"))?;
let switch_runtime_profile = track_current_profile && runtime.current_profile != profile_name;
let switch_global_profile =
track_current_profile && !matches!(route_kind, RuntimeRouteKind::Compact);
let switched = switch_runtime_profile;
let now = Local::now().timestamp();
let cleared_retry_backoff = runtime
.profile_retry_backoff_until
.remove(profile_name)
.is_some();
let cleared_transport_backoff =
clear_runtime_profile_transport_backoff_for_route(&mut runtime, profile_name, route_kind);
let cleared_route_circuit =
clear_runtime_profile_circuit_for_route(&mut runtime, profile_name, route_kind);
let cleared_health =
clear_runtime_profile_health_for_route(&mut runtime, profile_name, route_kind, now);
if switch_runtime_profile {
runtime.current_profile = profile_name.to_string();
}
let state_changed =
switch_global_profile && runtime.state.active_profile.as_deref() != Some(profile_name);
if switch_global_profile {
runtime.state.active_profile = Some(profile_name.to_string());
record_run_selection(&mut runtime.state, profile_name);
}
let should_persist = switched
|| state_changed
|| cleared_retry_backoff
|| cleared_transport_backoff
|| cleared_route_circuit
|| cleared_health;
if should_persist {
schedule_runtime_state_save_from_runtime(
shared,
&runtime,
&format!("profile_commit:{profile_name}"),
);
}
drop(runtime);
if switch_runtime_profile {
update_runtime_broker_current_profile(&shared.log_path, profile_name);
}
runtime_proxy_log(
shared,
format!(
"profile_commit profile={profile_name} route={} switched={switched} persisted={should_persist} track_current_profile={track_current_profile} cleared_route_circuit={cleared_route_circuit}",
runtime_route_kind_label(route_kind),
),
);
Ok(switched)
}
pub(crate) fn clear_runtime_profile_health_for_route(
runtime: &mut RuntimeRotationState,
profile_name: &str,
route_kind: RuntimeRouteKind,
now: i64,
) -> bool {
let mut changed = runtime.profile_health.remove(profile_name).is_some();
let previous_route_score =
runtime_profile_route_health_score(runtime, profile_name, now, route_kind);
let previous_bad_pairing = runtime_profile_effective_score_from_map(
&runtime.profile_health,
&runtime_profile_route_bad_pairing_key(profile_name, route_kind),
now,
RUNTIME_PROFILE_BAD_PAIRING_DECAY_SECONDS,
);
let previous_circuit_reopen = runtime_profile_effective_score_from_map(
&runtime.profile_health,
&runtime_profile_route_circuit_reopen_key(profile_name, route_kind),
now,
RUNTIME_PROFILE_CIRCUIT_REOPEN_DECAY_SECONDS,
);
recover_runtime_profile_health_for_route(runtime, profile_name, route_kind, now);
changed = changed || previous_route_score > 0;
changed = changed || previous_bad_pairing > 0;
changed = changed || previous_circuit_reopen > 0;
changed = runtime
.profile_health
.remove(&runtime_profile_route_bad_pairing_key(
profile_name,
route_kind,
))
.is_some()
|| changed;
changed = runtime
.profile_health
.remove(&runtime_profile_route_circuit_reopen_key(
profile_name,
route_kind,
))
.is_some()
|| changed;
changed
}
pub(crate) fn commit_runtime_proxy_profile_selection_with_notice(
shared: &RuntimeRotationProxyShared,
profile_name: &str,
route_kind: RuntimeRouteKind,
) -> Result<()> {
let _ = commit_runtime_proxy_profile_selection(shared, profile_name, route_kind)?;
Ok(())
}