use std::collections::HashMap;
use std::time::{Duration, Instant};
use super::backend_registry::BackendKey;
use super::spawn_coordinator::DEFAULT_SPAWN_BUDGET_WINDOW;
pub const DEFAULT_RECOVERY_RETRY_BACKOFF: Duration = Duration::from_millis(250);
pub const DEFAULT_RECOVERY_BUDGET_WINDOW: Duration = DEFAULT_SPAWN_BUDGET_WINDOW;
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub struct BackendRecoveryPolicy {
pub retry_backoff: Duration,
pub budget_window: Duration,
}
impl BackendRecoveryPolicy {
pub fn new(retry_backoff: Duration, budget_window: Duration) -> Self {
Self {
retry_backoff: non_zero_duration(retry_backoff),
budget_window: non_zero_duration(budget_window),
}
}
}
impl Default for BackendRecoveryPolicy {
fn default() -> Self {
Self {
retry_backoff: DEFAULT_RECOVERY_RETRY_BACKOFF,
budget_window: DEFAULT_RECOVERY_BUDGET_WINDOW,
}
}
}
#[derive(Debug)]
pub struct BackendRecoveryState {
policy: BackendRecoveryPolicy,
entries: HashMap<BackendKey, BackendRecoveryEntry>,
}
impl BackendRecoveryState {
pub fn new() -> Self {
Self::with_policy(BackendRecoveryPolicy::default())
}
pub fn with_policy(policy: BackendRecoveryPolicy) -> Self {
Self {
policy,
entries: HashMap::new(),
}
}
pub fn record_crash(&mut self, key: BackendKey, now: Instant) -> BackendRecoveryDecision {
let entry = self
.entries
.entry(key)
.or_insert_with(|| BackendRecoveryEntry::new(now));
entry.refresh(now, self.policy.budget_window);
entry.crashes_in_window = entry.crashes_in_window.saturating_add(1);
if entry.crashes_in_window == 1 {
return BackendRecoveryDecision::Retry {
retry_after: self.policy.retry_backoff,
attempt: 1,
};
}
BackendRecoveryDecision::Refuse {
reason: BackendRecoveryRefusalReason::BackendUnavailable,
retry_after: retry_after(entry.window_started_at, now, self.policy.budget_window),
}
}
pub fn record_success(&mut self, key: &BackendKey) {
self.entries.remove(key);
}
}
impl Default for BackendRecoveryState {
fn default() -> Self {
Self::new()
}
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum BackendRecoveryDecision {
Retry {
retry_after: Duration,
attempt: u32,
},
Refuse {
reason: BackendRecoveryRefusalReason,
retry_after: Duration,
},
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum BackendRecoveryRefusalReason {
BackendUnavailable,
}
#[derive(Clone, Debug)]
struct BackendRecoveryEntry {
window_started_at: Instant,
crashes_in_window: u32,
}
impl BackendRecoveryEntry {
fn new(now: Instant) -> Self {
Self {
window_started_at: now,
crashes_in_window: 0,
}
}
fn refresh(&mut self, now: Instant, budget_window: Duration) {
if elapsed_since(self.window_started_at, now) >= budget_window {
self.window_started_at = now;
self.crashes_in_window = 0;
}
}
}
fn non_zero_duration(duration: Duration) -> Duration {
if duration.is_zero() {
Duration::from_millis(1)
} else {
duration
}
}
fn retry_after(window_started_at: Instant, now: Instant, budget_window: Duration) -> Duration {
budget_window.saturating_sub(elapsed_since(window_started_at, now))
}
fn elapsed_since(started_at: Instant, now: Instant) -> Duration {
now.checked_duration_since(started_at)
.unwrap_or(Duration::ZERO)
}