use std::collections::VecDeque;
use std::time::{Duration, Instant};
use super::{HandoffAttemptDecision, HandoffFallbackDecision, HandoffFallbackReason};
pub const DEFAULT_MAX_PENDING_HANDOFFS: usize = 64;
pub const DEFAULT_PENDING_HANDOFF_TTL: Duration = Duration::from_millis(100);
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub struct PendingHandoffQueueConfig {
pub max_pending_handoffs: usize,
pub pending_ttl: Duration,
}
impl PendingHandoffQueueConfig {
pub fn new(max_pending_handoffs: usize, pending_ttl: Duration) -> Self {
Self {
max_pending_handoffs: max_pending_handoffs.max(1),
pending_ttl: if pending_ttl.is_zero() {
Duration::from_millis(1)
} else {
pending_ttl
},
}
}
}
impl Default for PendingHandoffQueueConfig {
fn default() -> Self {
Self {
max_pending_handoffs: DEFAULT_MAX_PENDING_HANDOFFS,
pending_ttl: DEFAULT_PENDING_HANDOFF_TTL,
}
}
}
#[derive(Debug)]
pub struct PendingHandoffQueue<T> {
config: PendingHandoffQueueConfig,
queue: VecDeque<PendingHandoffEntry<T>>,
}
impl<T> PendingHandoffQueue<T> {
pub fn new() -> Self {
Self::with_config(PendingHandoffQueueConfig::default())
}
pub fn with_config(config: PendingHandoffQueueConfig) -> Self {
Self {
config,
queue: VecDeque::with_capacity(config.max_pending_handoffs),
}
}
pub fn config(&self) -> PendingHandoffQueueConfig {
self.config
}
pub fn pending_len(&self) -> usize {
self.queue.len()
}
pub fn is_empty(&self) -> bool {
self.queue.is_empty()
}
pub fn enqueue(&mut self, handoff: T, now: Instant) -> Result<(), PendingHandoffOverflow> {
self.expire(now);
if self.queue.len() >= self.config.max_pending_handoffs {
return Err(PendingHandoffOverflow {
max_pending_handoffs: self.config.max_pending_handoffs,
});
}
self.queue.push_back(PendingHandoffEntry {
handoff,
expires_at: expires_at(now, self.config.pending_ttl),
});
Ok(())
}
pub fn dequeue(&mut self, now: Instant) -> Option<T> {
self.expire(now);
self.queue.pop_front().map(|entry| entry.handoff)
}
pub fn expire(&mut self, now: Instant) -> usize {
let before = self.queue.len();
self.queue.retain(|entry| now < entry.expires_at);
before - self.queue.len()
}
}
impl<T> Default for PendingHandoffQueue<T> {
fn default() -> Self {
Self::new()
}
}
#[derive(Clone, Debug, PartialEq, Eq, thiserror::Error)]
#[error("pending handoff queue full ({max_pending_handoffs})")]
pub struct PendingHandoffOverflow {
pub max_pending_handoffs: usize,
}
impl PendingHandoffOverflow {
pub fn fallback_reason(&self) -> HandoffFallbackReason {
HandoffFallbackReason::FdPressureDisabled
}
pub fn fallback_decision(&self) -> HandoffFallbackDecision {
HandoffFallbackDecision::new(self.fallback_reason())
}
pub fn fallback_attempt_decision(&self) -> HandoffAttemptDecision {
HandoffAttemptDecision::FallbackToReconnect(self.fallback_decision())
}
pub fn is_fallback_safe(&self) -> bool {
let fallback = self.fallback_decision();
fallback.uses_backend_reconnect() && !fallback.sends_client_error()
}
}
#[derive(Debug)]
struct PendingHandoffEntry<T> {
handoff: T,
expires_at: Instant,
}
fn expires_at(now: Instant, ttl: Duration) -> Instant {
now.checked_add(ttl).unwrap_or(now)
}