use crate::clients::disconnect::DisconnectReason;
use rand::Rng;
use std::fmt;
use std::time::Duration;
use tokio::time::Instant;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum CircuitState {
Closed,
Open,
HalfOpen,
}
impl fmt::Display for CircuitState {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Closed => write!(f, "closed"),
Self::Open => write!(f, "open"),
Self::HalfOpen => write!(f, "half_open"),
}
}
}
#[derive(Debug)]
pub enum ReconnectAction {
RetryAfter(Duration),
RetryImmediately,
GiveUp { reason: String },
CircuitOpen { until: Instant },
}
pub struct ReconnectPolicy {
initial_delay: Duration,
max_delay: Duration,
max_attempts: Option<u32>,
jitter_factor: f64,
current_delay: Duration,
consecutive_failures: u32,
circuit_state: CircuitState,
cooldown_until: Option<Instant>,
cooldown_duration: Duration,
}
impl ReconnectPolicy {
pub fn builder() -> ReconnectPolicyBuilder {
ReconnectPolicyBuilder::default()
}
pub fn consecutive_failures(&self) -> u32 {
self.consecutive_failures
}
pub fn circuit_state(&self) -> CircuitState {
self.circuit_state
}
pub fn next_action(&mut self, reason: &DisconnectReason) -> ReconnectAction {
if !reason.is_retryable() {
tracing::error!(
reason = %reason,
"reconnect.non_retryable"
);
return ReconnectAction::GiveUp {
reason: format!("{reason}"),
};
}
match self.circuit_state {
CircuitState::Open => {
if let Some(until) = self.cooldown_until {
if Instant::now() < until {
tracing::error!(
cooldown_remaining_ms = (until - Instant::now()).as_millis() as u64,
circuit = %self.circuit_state,
"reconnect.circuit_open"
);
return ReconnectAction::CircuitOpen { until };
}
}
self.circuit_state = CircuitState::HalfOpen;
tracing::info!("reconnect.circuit_half_open");
return ReconnectAction::RetryAfter(self.initial_delay);
}
CircuitState::HalfOpen => {
self.cooldown_duration =
(self.cooldown_duration * 2).min(self.max_delay * 8);
self.cooldown_until = Some(Instant::now() + self.cooldown_duration);
self.circuit_state = CircuitState::Open;
tracing::error!(
cooldown_ms = self.cooldown_duration.as_millis() as u64,
"reconnect.circuit_reopen"
);
return ReconnectAction::CircuitOpen {
until: self.cooldown_until.unwrap(),
};
}
CircuitState::Closed => { }
}
let factor = reason.suggested_delay_factor();
if factor == 0.0 {
tracing::info!(reason = %reason, "reconnect.retry_immediately");
return ReconnectAction::RetryImmediately;
}
self.consecutive_failures += 1;
if let Some(max) = self.max_attempts {
if self.consecutive_failures >= max {
self.cooldown_duration = self.max_delay * 4;
self.cooldown_until = Some(Instant::now() + self.cooldown_duration);
self.circuit_state = CircuitState::Open;
tracing::error!(
attempts = self.consecutive_failures,
max_attempts = max,
cooldown_ms = self.cooldown_duration.as_millis() as u64,
circuit = %self.circuit_state,
"reconnect.circuit_open"
);
return ReconnectAction::CircuitOpen {
until: self.cooldown_until.unwrap(),
};
}
}
let base_ms = self.current_delay.as_millis() as f64 * factor;
let jitter_ms = if self.jitter_factor > 0.0 {
rand::rng().random_range(0.0..base_ms * self.jitter_factor)
} else {
0.0
};
let delay = Duration::from_millis((base_ms + jitter_ms) as u64);
tracing::warn!(
attempts = self.consecutive_failures,
delay_ms = delay.as_millis() as u64,
base_ms = base_ms as u64,
jitter_ms = jitter_ms as u64,
reason = %reason,
"reconnect.backoff"
);
self.current_delay = (self.current_delay * 2).min(self.max_delay);
ReconnectAction::RetryAfter(delay)
}
pub fn on_connected(&mut self) {
self.consecutive_failures = 0;
self.current_delay = self.initial_delay;
self.cooldown_until = None;
self.cooldown_duration = self.max_delay * 4;
if self.circuit_state != CircuitState::Closed {
tracing::info!(
prev_state = %self.circuit_state,
"reconnect.circuit_closed"
);
self.circuit_state = CircuitState::Closed;
}
}
pub fn on_message_received(&mut self) {
self.current_delay = self.initial_delay;
}
}
pub struct ReconnectPolicyBuilder {
initial_delay: Duration,
max_delay: Duration,
max_attempts: Option<u32>,
jitter_factor: f64,
}
impl Default for ReconnectPolicyBuilder {
fn default() -> Self {
Self {
initial_delay: Duration::from_secs(1),
max_delay: Duration::from_secs(30),
max_attempts: None,
jitter_factor: 0.5,
}
}
}
impl ReconnectPolicyBuilder {
pub fn initial_delay(mut self, d: Duration) -> Self {
self.initial_delay = d;
self
}
pub fn max_delay(mut self, d: Duration) -> Self {
self.max_delay = d;
self
}
pub fn max_attempts(mut self, n: Option<u32>) -> Self {
self.max_attempts = n;
self
}
pub fn jitter_factor(mut self, f: f64) -> Self {
self.jitter_factor = f.max(0.0);
self
}
pub fn build(self) -> ReconnectPolicy {
assert!(
self.initial_delay <= self.max_delay,
"initial_delay ({:?}) must be <= max_delay ({:?})",
self.initial_delay,
self.max_delay,
);
ReconnectPolicy {
initial_delay: self.initial_delay,
max_delay: self.max_delay,
max_attempts: self.max_attempts,
jitter_factor: self.jitter_factor,
current_delay: self.initial_delay,
consecutive_failures: 0,
circuit_state: CircuitState::Closed,
cooldown_until: None,
cooldown_duration: self.max_delay * 4,
}
}
}
pub struct HealthMonitor {
timeout: Duration,
last_activity: Instant,
}
impl HealthMonitor {
pub fn new(timeout: Duration) -> Self {
Self {
timeout,
last_activity: Instant::now(),
}
}
pub fn record_activity(&mut self) {
self.last_activity = Instant::now();
}
pub fn deadline(&self) -> Instant {
self.last_activity + self.timeout
}
pub fn is_stale(&self) -> bool {
Instant::now() > self.deadline()
}
pub fn timeout(&self) -> Duration {
self.timeout
}
}
pub trait ConnectionHealth {
fn staleness_timeout(&self) -> Duration {
Duration::from_secs(90)
}
}