use crate::connection::PgReplicationConnection;
use crate::error::{ReplicationError, Result};
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
use tracing::{debug, error, info};
#[derive(Debug, Copy, Clone)]
pub struct RetryConfig {
pub max_attempts: u32,
pub initial_delay: Duration,
pub max_delay: Duration,
pub multiplier: f64,
pub max_duration: Duration,
pub jitter: bool,
}
impl Default for RetryConfig {
fn default() -> Self {
Self {
max_attempts: 5,
initial_delay: Duration::from_secs(1),
max_delay: Duration::from_secs(60),
multiplier: 2.0,
max_duration: Duration::from_secs(300),
jitter: true,
}
}
}
#[derive(Debug, Clone)]
pub struct ExponentialBackoff {
initial_delay: Duration,
max_delay: Duration,
multiplier: f64,
jitter: bool,
current_delay: Duration,
attempt: u32,
}
impl ExponentialBackoff {
pub fn new(config: &RetryConfig) -> Self {
Self {
initial_delay: config.initial_delay,
max_delay: config.max_delay,
multiplier: config.multiplier,
jitter: config.jitter,
current_delay: config.initial_delay,
attempt: 0,
}
}
pub fn next_delay(&mut self) -> Duration {
let delay = self.current_delay;
let next_delay_ms = (self.current_delay.as_millis() as f64 * self.multiplier) as u64;
self.current_delay = Duration::from_millis(next_delay_ms).min(self.max_delay);
self.attempt += 1;
if self.jitter {
self.add_jitter(delay)
} else {
delay
}
}
fn add_jitter(&self, delay: Duration) -> Duration {
let nanos = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.subsec_nanos();
let jitter_factor = 0.3;
let base_millis = delay.as_millis() as f64;
let jitter_range = base_millis * jitter_factor;
let jitter = (nanos % 1000) as f64 / 1000.0; let jitter_adjustment = (jitter - 0.5) * 2.0 * jitter_range;
let final_millis = (base_millis + jitter_adjustment).max(0.0) as u64;
Duration::from_millis(final_millis)
}
pub fn reset(&mut self) {
self.current_delay = self.initial_delay;
self.attempt = 0;
}
pub fn attempt(&self) -> u32 {
self.attempt
}
}
impl RetryConfig {
pub fn to_backoff(&self) -> ExponentialBackoff {
ExponentialBackoff::new(self)
}
}
pub struct ReplicationConnectionRetry {
config: RetryConfig,
connection_string: String,
}
impl ReplicationConnectionRetry {
pub fn new(config: RetryConfig, connection_string: String) -> Self {
Self {
config,
connection_string,
}
}
pub async fn connect_with_retry(&self) -> Result<PgReplicationConnection> {
let start_time = Instant::now();
let mut backoff = self.config.to_backoff();
info!("Attempting to connect to PostgreSQL with retry logic");
for attempt in 1..=self.config.max_attempts {
debug!(
"Attempting PostgreSQL connection (attempt {}/{})",
attempt, self.config.max_attempts
);
if start_time.elapsed() >= self.config.max_duration {
error!(
"Connection attempts exceeded maximum duration ({:?})",
self.config.max_duration
);
return Err(ReplicationError::connection(format!(
"Connection attempts exceeded maximum duration of {:?}",
self.config.max_duration
)));
}
match PgReplicationConnection::connect(&self.connection_string) {
Ok(conn) => {
let elapsed = start_time.elapsed();
info!(
"Successfully connected to PostgreSQL on attempt {} after {:?}",
attempt, elapsed
);
return Ok(conn);
}
Err(e) => {
let error_msg = e.to_string();
error!("Connection attempt {} failed: {}", attempt, error_msg);
if attempt >= self.config.max_attempts {
let elapsed = start_time.elapsed();
error!("All connection attempts failed after {:?}", elapsed);
return Err(e);
}
let delay = backoff.next_delay();
debug!("Waiting {:?} before next attempt", delay);
tokio::time::sleep(delay).await;
}
}
}
let elapsed = start_time.elapsed();
error!("Connection failed after all attempts and {:?}", elapsed);
Err(ReplicationError::connection(
"Connection failed after all retry attempts".to_string(),
))
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_exponential_backoff_basic() {
let config = RetryConfig {
max_attempts: 5,
initial_delay: Duration::from_millis(100),
max_delay: Duration::from_secs(60),
multiplier: 2.0,
max_duration: Duration::from_secs(300),
jitter: false,
};
let mut backoff = ExponentialBackoff::new(&config);
let first_delay = backoff.next_delay();
assert_eq!(first_delay, Duration::from_millis(100));
assert_eq!(backoff.attempt(), 1);
let second_delay = backoff.next_delay();
assert_eq!(second_delay, Duration::from_millis(200));
assert_eq!(backoff.attempt(), 2);
let third_delay = backoff.next_delay();
assert_eq!(third_delay, Duration::from_millis(400));
assert_eq!(backoff.attempt(), 3);
}
#[test]
fn test_exponential_backoff_max_delay() {
let config = RetryConfig {
max_attempts: 10,
initial_delay: Duration::from_millis(100),
max_delay: Duration::from_millis(500), multiplier: 2.0,
max_duration: Duration::from_secs(300),
jitter: false,
};
let mut backoff = ExponentialBackoff::new(&config);
backoff.next_delay(); backoff.next_delay(); backoff.next_delay(); let delay = backoff.next_delay();
assert!(delay <= Duration::from_millis(500));
}
#[test]
fn test_exponential_backoff_reset() {
let config = RetryConfig {
max_attempts: 5,
initial_delay: Duration::from_millis(100),
max_delay: Duration::from_secs(60),
multiplier: 2.0,
max_duration: Duration::from_secs(300),
jitter: false,
};
let mut backoff = ExponentialBackoff::new(&config);
backoff.next_delay();
backoff.next_delay();
assert_eq!(backoff.attempt(), 2);
backoff.reset();
assert_eq!(backoff.attempt(), 0);
let delay = backoff.next_delay();
assert_eq!(delay, Duration::from_millis(100));
assert_eq!(backoff.attempt(), 1);
}
#[test]
fn test_exponential_backoff_jitter() {
let config = RetryConfig {
max_attempts: 5,
initial_delay: Duration::from_millis(100),
max_delay: Duration::from_secs(60),
multiplier: 2.0,
max_duration: Duration::from_secs(300),
jitter: true,
};
let mut backoff = ExponentialBackoff::new(&config);
let mut saw_variation = false;
let mut prev = backoff.next_delay();
for _ in 0..5 {
let delay = backoff.next_delay();
if delay != prev {
saw_variation = true;
break;
}
prev = delay;
}
assert!(saw_variation, "Expected jitter to vary delays");
}
#[test]
fn test_max_attempts() {
let config = RetryConfig {
max_attempts: 3,
initial_delay: Duration::from_millis(100),
max_delay: Duration::from_secs(60),
multiplier: 2.0,
max_duration: Duration::from_secs(300),
jitter: false,
};
let mut backoff = ExponentialBackoff::new(&config);
backoff.next_delay();
assert_eq!(backoff.attempt(), 1);
backoff.next_delay();
assert_eq!(backoff.attempt(), 2);
backoff.next_delay();
assert_eq!(backoff.attempt(), 3);
assert_eq!(backoff.attempt(), config.max_attempts);
}
#[test]
fn test_multiplier_effect() {
let config = RetryConfig {
max_attempts: 5,
initial_delay: Duration::from_millis(10),
max_delay: Duration::from_secs(60),
multiplier: 3.0, max_duration: Duration::from_secs(300),
jitter: false,
};
let mut backoff = ExponentialBackoff::new(&config);
assert_eq!(backoff.next_delay(), Duration::from_millis(10));
assert_eq!(backoff.next_delay(), Duration::from_millis(30));
assert_eq!(backoff.next_delay(), Duration::from_millis(90));
}
#[test]
fn test_retry_config_default() {
let config = RetryConfig::default();
assert_eq!(config.max_attempts, 5);
assert_eq!(config.initial_delay, Duration::from_secs(1));
assert_eq!(config.max_delay, Duration::from_secs(60));
assert_eq!(config.multiplier, 2.0);
}
#[test]
fn test_backoff_current_delay_field() {
let config = RetryConfig {
max_attempts: 5,
initial_delay: Duration::from_millis(100),
max_delay: Duration::from_secs(60),
multiplier: 2.0,
max_duration: Duration::from_secs(300),
jitter: false,
};
let mut backoff = ExponentialBackoff::new(&config);
assert_eq!(backoff.current_delay, Duration::from_millis(100));
backoff.next_delay();
assert_eq!(backoff.current_delay, Duration::from_millis(200));
}
#[test]
fn test_reset_clears_state() {
let config = RetryConfig {
max_attempts: 5,
initial_delay: Duration::from_millis(100),
max_delay: Duration::from_secs(60),
multiplier: 2.0,
max_duration: Duration::from_secs(300),
jitter: false,
};
let mut backoff = ExponentialBackoff::new(&config);
backoff.next_delay();
backoff.next_delay();
backoff.reset();
assert_eq!(backoff.attempt(), 0);
assert_eq!(backoff.current_delay, Duration::from_millis(100));
}
#[test]
fn test_to_backoff() {
let config = RetryConfig::default();
let backoff = config.to_backoff();
assert_eq!(backoff.attempt(), 0);
assert_eq!(backoff.current_delay, config.initial_delay);
}
#[test]
fn test_replication_connection_retry_new() {
let config = RetryConfig::default();
let retry = ReplicationConnectionRetry::new(
config,
"postgresql://localhost/test?replication=database".to_string(),
);
assert_eq!(
retry.connection_string,
"postgresql://localhost/test?replication=database"
);
}
#[test]
fn test_jitter_stays_within_bounds() {
let config = RetryConfig {
max_attempts: 50,
initial_delay: Duration::from_millis(1000),
max_delay: Duration::from_secs(60),
multiplier: 1.0, max_duration: Duration::from_secs(300),
jitter: true,
};
let mut backoff = ExponentialBackoff::new(&config);
for _ in 0..20 {
let delay = backoff.next_delay();
let millis = delay.as_millis() as f64;
assert!(
(700.0..=1300.0).contains(&millis),
"Jitter delay {millis}ms is outside ±30% of 1000ms"
);
backoff.current_delay = Duration::from_millis(1000);
}
}
#[test]
fn test_retry_config_clone_and_debug() {
let config = RetryConfig::default();
let cloned = config;
assert_eq!(cloned.max_attempts, config.max_attempts);
assert_eq!(cloned.initial_delay, config.initial_delay);
assert_eq!(cloned.max_delay, config.max_delay);
let debug_str = format!("{:?}", config);
assert!(debug_str.contains("RetryConfig"));
}
#[test]
fn test_exponential_backoff_clone_debug() {
let config = RetryConfig::default();
let backoff = ExponentialBackoff::new(&config);
let cloned = backoff.clone();
assert_eq!(cloned.attempt(), backoff.attempt());
let debug_str = format!("{:?}", backoff);
assert!(debug_str.contains("ExponentialBackoff"));
}
#[test]
fn test_max_delay_cap_with_jitter() {
let config = RetryConfig {
max_attempts: 20,
initial_delay: Duration::from_secs(30),
max_delay: Duration::from_secs(60),
multiplier: 2.0,
max_duration: Duration::from_secs(300),
jitter: true,
};
let mut backoff = ExponentialBackoff::new(&config);
for _ in 0..10 {
let delay = backoff.next_delay();
let max_allowed = Duration::from_secs(60).as_millis() as f64 * 1.3;
assert!(
(delay.as_millis() as f64) <= max_allowed + 1.0,
"Delay {:?} exceeds max allowed with jitter",
delay
);
}
}
#[test]
fn test_add_jitter_small_delay() {
let config = RetryConfig {
max_attempts: 5,
initial_delay: Duration::from_millis(1),
max_delay: Duration::from_secs(60),
multiplier: 1.0,
max_duration: Duration::from_secs(300),
jitter: true,
};
let mut backoff = ExponentialBackoff::new(&config);
let delay = backoff.next_delay();
assert!(delay.as_millis() <= 2); }
#[test]
fn test_backoff_max_delay_capping() {
let config = RetryConfig {
max_attempts: 10,
initial_delay: Duration::from_secs(50),
max_delay: Duration::from_secs(60),
multiplier: 2.0,
max_duration: Duration::from_secs(300),
jitter: false,
};
let mut backoff = ExponentialBackoff::new(&config);
let d1 = backoff.next_delay(); assert_eq!(d1, Duration::from_secs(50));
let d2 = backoff.next_delay(); assert_eq!(d2, Duration::from_secs(60));
let d3 = backoff.next_delay(); assert_eq!(d3, Duration::from_secs(60));
}
}