use std::collections::HashMap;
use std::time::{Duration, Instant};
use super::backend_registry::BackendKey;
use super::broadcast::QuiesceReason;
pub const DEFAULT_BACKEND_IDLE_TIMEOUT: Duration = Duration::from_secs(30);
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub struct BackendIdlePolicy {
pub default_idle_timeout: Duration,
}
impl BackendIdlePolicy {
pub fn new(default_idle_timeout: Duration) -> Self {
Self {
default_idle_timeout: non_zero_duration(default_idle_timeout),
}
}
pub fn default_idle_timeout(&self) -> Duration {
self.default_idle_timeout
}
}
impl Default for BackendIdlePolicy {
fn default() -> Self {
Self {
default_idle_timeout: DEFAULT_BACKEND_IDLE_TIMEOUT,
}
}
}
#[derive(Debug)]
pub struct BackendIdleCoordinator {
policy: BackendIdlePolicy,
entries: HashMap<BackendKey, BackendIdleEntry>,
}
impl BackendIdleCoordinator {
pub fn new() -> Self {
Self::with_policy(BackendIdlePolicy::default())
}
pub fn with_policy(policy: BackendIdlePolicy) -> Self {
Self {
policy,
entries: HashMap::new(),
}
}
pub fn mark_activity(&mut self, key: BackendKey, now: Instant) {
self.mark_activity_with_timeout(key, now, self.policy.default_idle_timeout);
}
pub fn mark_activity_with_timeout(
&mut self,
key: BackendKey,
now: Instant,
idle_timeout: Duration,
) {
self.entries.insert(
key,
BackendIdleEntry {
last_active_at: now,
idle_timeout: non_zero_duration(idle_timeout),
state: BackendIdleState::Running,
},
);
}
pub fn mark_draining(&mut self, key: &BackendKey) -> bool {
self.mark_state(key, BackendIdleState::Draining)
}
pub fn mark_quiesced(&mut self, key: &BackendKey) -> bool {
self.mark_state(key, BackendIdleState::Quiesced)
}
pub fn remove_backend(&mut self, key: &BackendKey) -> bool {
self.entries.remove(key).is_some()
}
pub fn collect_due_for_quiesce(&mut self, now: Instant) -> Vec<BackendIdleDue> {
let mut due = Vec::new();
for (key, entry) in &mut self.entries {
if entry.state != BackendIdleState::Running {
continue;
}
let idle_for = elapsed_since(entry.last_active_at, now);
if idle_for < entry.idle_timeout {
continue;
}
entry.state = BackendIdleState::Draining;
due.push(BackendIdleDue {
key: key.clone(),
idle_for,
configured_timeout: entry.idle_timeout,
reason: QuiesceReason::IdleTimeout,
});
}
due
}
fn mark_state(&mut self, key: &BackendKey, state: BackendIdleState) -> bool {
let Some(entry) = self.entries.get_mut(key) else {
return false;
};
entry.state = state;
true
}
}
impl Default for BackendIdleCoordinator {
fn default() -> Self {
Self::new()
}
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct BackendIdleDue {
pub key: BackendKey,
pub idle_for: Duration,
pub configured_timeout: Duration,
pub reason: QuiesceReason,
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
enum BackendIdleState {
Running,
Draining,
Quiesced,
}
#[derive(Clone, Debug)]
struct BackendIdleEntry {
last_active_at: Instant,
idle_timeout: Duration,
state: BackendIdleState,
}
fn non_zero_duration(duration: Duration) -> Duration {
if duration.is_zero() {
Duration::from_millis(1)
} else {
duration
}
}
fn elapsed_since(started_at: Instant, now: Instant) -> Duration {
now.checked_duration_since(started_at)
.unwrap_or(Duration::ZERO)
}