use std::collections::HashMap;
use std::time::{Duration, Instant};
use super::super::backend_registry::BackendKey;
pub const DEFAULT_HANDOFF_FAILED_ATTEMPTS_PER_WINDOW: usize = 8;
pub const DEFAULT_HANDOFF_FAILED_ATTEMPT_WINDOW: Duration = Duration::from_secs(30);
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub struct HandoffFallbackPolicy {
pub max_failed_attempts_per_window: usize,
pub failed_attempt_window: Duration,
}
impl HandoffFallbackPolicy {
pub fn new(max_failed_attempts_per_window: usize, failed_attempt_window: Duration) -> Self {
Self {
max_failed_attempts_per_window: max_failed_attempts_per_window.max(1),
failed_attempt_window: if failed_attempt_window.is_zero() {
Duration::from_millis(1)
} else {
failed_attempt_window
},
}
}
}
impl Default for HandoffFallbackPolicy {
fn default() -> Self {
Self {
max_failed_attempts_per_window: DEFAULT_HANDOFF_FAILED_ATTEMPTS_PER_WINDOW,
failed_attempt_window: DEFAULT_HANDOFF_FAILED_ATTEMPT_WINDOW,
}
}
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub struct HandoffAttemptInputs {
pub client_supports_handoff: bool,
pub service_policy_enabled: bool,
pub fd_pressure_disabled: bool,
pub backend_adopted_existing: bool,
}
impl HandoffAttemptInputs {
pub fn new(
client_supports_handoff: bool,
service_policy_enabled: bool,
fd_pressure_disabled: bool,
) -> Self {
Self {
client_supports_handoff,
service_policy_enabled,
fd_pressure_disabled,
backend_adopted_existing: false,
}
}
pub fn enabled() -> Self {
Self::new(true, true, false)
}
pub fn adopted_backend(client_supports_handoff: bool) -> Self {
Self {
client_supports_handoff,
service_policy_enabled: true,
fd_pressure_disabled: false,
backend_adopted_existing: true,
}
}
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum HandoffAttemptDecision {
Attempt,
FallbackToReconnect(HandoffFallbackDecision),
}
impl HandoffAttemptDecision {
pub fn fallback(&self) -> Option<&HandoffFallbackDecision> {
match self {
Self::Attempt => None,
Self::FallbackToReconnect(decision) => Some(decision),
}
}
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct HandoffFallbackDecision {
pub reason: HandoffFallbackReason,
pub retry_after: Option<Duration>,
}
impl HandoffFallbackDecision {
pub fn new(reason: HandoffFallbackReason) -> Self {
Self {
reason,
retry_after: None,
}
}
pub fn with_retry_after(reason: HandoffFallbackReason, retry_after: Duration) -> Self {
Self {
reason,
retry_after: Some(retry_after),
}
}
pub fn uses_backend_reconnect(&self) -> bool {
true
}
pub fn sends_client_error(&self) -> bool {
false
}
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum HandoffFallbackReason {
ClientUnsupported,
ServicePolicyDisabled,
FdPressureDisabled,
FailedAttemptRateLimited,
PermissionDenied,
IntegrityMismatch,
BackendAckTimeout,
AdoptedBackend,
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum HandoffAttemptFailure {
PermissionDenied,
IntegrityMismatch,
BackendAckTimeout,
}
impl From<HandoffAttemptFailure> for HandoffFallbackReason {
fn from(value: HandoffAttemptFailure) -> Self {
match value {
HandoffAttemptFailure::PermissionDenied => Self::PermissionDenied,
HandoffAttemptFailure::IntegrityMismatch => Self::IntegrityMismatch,
HandoffAttemptFailure::BackendAckTimeout => Self::BackendAckTimeout,
}
}
}
#[derive(Debug)]
pub struct HandoffFallbackState {
policy: HandoffFallbackPolicy,
failed_attempts: HashMap<BackendKey, FailedAttemptWindow>,
}
impl HandoffFallbackState {
pub fn new() -> Self {
Self::with_policy(HandoffFallbackPolicy::default())
}
pub fn with_policy(policy: HandoffFallbackPolicy) -> Self {
Self {
policy,
failed_attempts: HashMap::new(),
}
}
pub fn policy(&self) -> HandoffFallbackPolicy {
self.policy
}
pub fn should_attempt(
&mut self,
backend: &BackendKey,
inputs: HandoffAttemptInputs,
now: Instant,
) -> HandoffAttemptDecision {
if !inputs.client_supports_handoff {
return fallback(HandoffFallbackReason::ClientUnsupported);
}
if !inputs.service_policy_enabled {
return fallback(HandoffFallbackReason::ServicePolicyDisabled);
}
if inputs.fd_pressure_disabled {
return fallback(HandoffFallbackReason::FdPressureDisabled);
}
if inputs.backend_adopted_existing {
return fallback(HandoffFallbackReason::AdoptedBackend);
}
match self.rate_limit_for(backend, now) {
Some(retry_after) => HandoffAttemptDecision::FallbackToReconnect(
HandoffFallbackDecision::with_retry_after(
HandoffFallbackReason::FailedAttemptRateLimited,
retry_after,
),
),
None => HandoffAttemptDecision::Attempt,
}
}
pub fn record_failed_attempt(
&mut self,
backend: BackendKey,
failure: HandoffAttemptFailure,
now: Instant,
) -> HandoffAttemptDecision {
let entry = self
.failed_attempts
.entry(backend)
.or_insert_with(|| FailedAttemptWindow::new(now));
entry.refresh_if_expired(now, self.policy.failed_attempt_window);
entry.count = entry
.count
.saturating_add(1)
.min(self.policy.max_failed_attempts_per_window);
fallback(failure.into())
}
pub fn record_success(&mut self, backend: &BackendKey) {
self.failed_attempts.remove(backend);
}
pub fn failed_attempt_count(&mut self, backend: &BackendKey, now: Instant) -> usize {
let policy_window = self.policy.failed_attempt_window;
let Some(entry) = self.failed_attempts.get_mut(backend) else {
return 0;
};
entry.refresh_if_expired(now, policy_window);
entry.count
}
fn rate_limit_for(&mut self, backend: &BackendKey, now: Instant) -> Option<Duration> {
let policy = self.policy;
let entry = self.failed_attempts.get_mut(backend)?;
entry.refresh_if_expired(now, policy.failed_attempt_window);
if entry.count < policy.max_failed_attempts_per_window {
return None;
}
Some(entry.retry_after(now, policy.failed_attempt_window))
}
}
impl Default for HandoffFallbackState {
fn default() -> Self {
Self::new()
}
}
#[derive(Clone, Debug)]
struct FailedAttemptWindow {
started_at: Instant,
count: usize,
}
impl FailedAttemptWindow {
fn new(now: Instant) -> Self {
Self {
started_at: now,
count: 0,
}
}
fn refresh_if_expired(&mut self, now: Instant, window: Duration) {
if now
.checked_duration_since(self.started_at)
.is_some_and(|elapsed| elapsed >= window)
{
self.started_at = now;
self.count = 0;
}
}
fn retry_after(&self, now: Instant, window: Duration) -> Duration {
let elapsed = now
.checked_duration_since(self.started_at)
.unwrap_or(Duration::ZERO);
window.saturating_sub(elapsed)
}
}
fn fallback(reason: HandoffFallbackReason) -> HandoffAttemptDecision {
HandoffAttemptDecision::FallbackToReconnect(HandoffFallbackDecision::new(reason))
}