use std::collections::HashMap;
use std::time::{Duration, Instant};
use super::fallback::{HandoffAttemptFailure, HandoffFallbackDecision, HandoffFallbackReason};
use super::handoff_token::{HandoffToken, HandoffTokenStore};
pub const DEFAULT_HANDOFF_ACK_DEADLINE: Duration = Duration::from_secs(5);
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct PendingHandoffBackend {
pub service_name: String,
pub backend_pid: u32,
}
impl PendingHandoffBackend {
pub fn for_service(service_name: impl Into<String>) -> Self {
Self {
service_name: service_name.into(),
backend_pid: 0,
}
}
pub fn new(service_name: impl Into<String>, backend_pid: u32) -> Self {
Self {
service_name: service_name.into(),
backend_pid,
}
}
}
#[derive(Debug)]
pub struct HandoffAckRegistry {
ack_deadline: Duration,
pending: HashMap<HandoffToken, PendingAckEntry>,
}
impl HandoffAckRegistry {
pub fn new() -> Self {
Self::with_ack_deadline(DEFAULT_HANDOFF_ACK_DEADLINE)
}
pub fn with_ack_deadline(ack_deadline: Duration) -> Self {
Self {
ack_deadline: if ack_deadline.is_zero() {
Duration::from_millis(1)
} else {
ack_deadline
},
pending: HashMap::new(),
}
}
pub fn ack_deadline(&self) -> Duration {
self.ack_deadline
}
pub fn pending_len(&self) -> usize {
self.pending.len()
}
pub fn pending_backend(&self, token: &HandoffToken) -> Option<&PendingHandoffBackend> {
self.pending.get(token).map(|entry| &entry.backend)
}
pub fn register(&mut self, token: HandoffToken, backend: PendingHandoffBackend, now: Instant) {
let ack_deadline_at = now.checked_add(self.ack_deadline).unwrap_or(now);
self.pending.insert(
token,
PendingAckEntry {
backend,
issued_at: now,
ack_deadline_at,
},
);
}
pub fn acknowledge(
&mut self,
tokens: &mut HandoffTokenStore,
token: &HandoffToken,
now: Instant,
) -> Result<AcknowledgedHandoff, HandoffAckError> {
let Some(entry) = self.pending.remove(token) else {
return Err(HandoffAckError::TokenNotPending);
};
if now >= entry.ack_deadline_at {
tokens.revoke(token);
return Err(HandoffAckError::AckDeadlineExceeded {
backend: entry.backend,
deadline: self.ack_deadline,
});
}
tokens.revoke(token);
Ok(AcknowledgedHandoff {
token: *token,
backend: entry.backend,
waited: now.saturating_duration_since(entry.issued_at),
})
}
pub fn abandon(
&mut self,
tokens: &mut HandoffTokenStore,
token: &HandoffToken,
) -> Option<PendingHandoffBackend> {
let entry = self.pending.remove(token);
tokens.revoke(token);
entry.map(|entry| entry.backend)
}
pub fn expire_overdue(
&mut self,
tokens: &mut HandoffTokenStore,
now: Instant,
) -> Vec<ExpiredHandoff> {
let overdue: Vec<HandoffToken> = self
.pending
.iter()
.filter(|(_, entry)| now >= entry.ack_deadline_at)
.map(|(token, _)| *token)
.collect();
let mut expired = Vec::with_capacity(overdue.len());
for token in overdue {
let Some(entry) = self.pending.remove(&token) else {
continue;
};
tokens.revoke(&token);
expired.push(ExpiredHandoff {
token,
backend: entry.backend,
deadline: self.ack_deadline,
});
}
expired
}
}
impl Default for HandoffAckRegistry {
fn default() -> Self {
Self::new()
}
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct AcknowledgedHandoff {
pub token: HandoffToken,
pub backend: PendingHandoffBackend,
pub waited: Duration,
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct ExpiredHandoff {
pub token: HandoffToken,
pub backend: PendingHandoffBackend,
pub deadline: Duration,
}
impl ExpiredHandoff {
pub fn attempt_failure(&self) -> HandoffAttemptFailure {
HandoffAttemptFailure::BackendAckTimeout
}
pub fn fallback_decision(&self) -> HandoffFallbackDecision {
HandoffFallbackDecision::new(HandoffFallbackReason::BackendAckTimeout)
}
}
#[derive(Clone, Debug, PartialEq, Eq, thiserror::Error)]
pub enum HandoffAckError {
#[error("handoff ACK token is not pending")]
TokenNotPending,
#[error("backend ACK deadline ({deadline:?}) exceeded for {backend:?}")]
AckDeadlineExceeded {
backend: PendingHandoffBackend,
deadline: Duration,
},
}
impl HandoffAckError {
pub fn attempt_failure(&self) -> Option<HandoffAttemptFailure> {
match self {
Self::TokenNotPending => None,
Self::AckDeadlineExceeded { .. } => Some(HandoffAttemptFailure::BackendAckTimeout),
}
}
}
#[derive(Clone, Debug)]
struct PendingAckEntry {
backend: PendingHandoffBackend,
issued_at: Instant,
ack_deadline_at: Instant,
}