1use crate::connection::PgReplicationConnection;
7use crate::error::{ReplicationError, Result};
8use std::time::{Duration, Instant};
9use tracing::{debug, error, info};
10
11#[derive(Debug, Copy, Clone)]
36pub struct RetryConfig {
37 pub max_attempts: u32,
39 pub initial_delay: Duration,
41 pub max_delay: Duration,
43 pub multiplier: f64,
45 pub max_duration: Duration,
47 pub jitter: bool,
49}
50
51impl Default for RetryConfig {
52 fn default() -> Self {
53 Self {
54 max_attempts: 5,
55 initial_delay: Duration::from_secs(1),
56 max_delay: Duration::from_secs(60),
57 multiplier: 2.0,
58 max_duration: Duration::from_secs(300),
59 jitter: true,
60 }
61 }
62}
63
64#[derive(Debug, Clone)]
66pub struct ExponentialBackoff {
67 initial_delay: Duration,
68 max_delay: Duration,
69 multiplier: f64,
70 jitter: bool,
71 current_delay: Duration,
72 attempt: u32,
73}
74
75impl ExponentialBackoff {
76 pub fn new(config: &RetryConfig) -> Self {
99 Self {
100 initial_delay: config.initial_delay,
101 max_delay: config.max_delay,
102 multiplier: config.multiplier,
103 jitter: config.jitter,
104 current_delay: config.initial_delay,
105 attempt: 0,
106 }
107 }
108
109 pub fn next_delay(&mut self) -> Duration {
140 let delay = self.current_delay;
141
142 let next_delay_ms = (self.current_delay.as_millis() as f64 * self.multiplier) as u64;
144 self.current_delay = Duration::from_millis(next_delay_ms).min(self.max_delay);
145 self.attempt += 1;
146
147 if self.jitter {
149 self.add_jitter(delay)
150 } else {
151 delay
152 }
153 }
154
155 fn add_jitter(&self, delay: Duration) -> Duration {
157 let now = Instant::now();
160 let nanos = now.elapsed().subsec_nanos();
161 let jitter_factor = 0.3;
162
163 let base_millis = delay.as_millis() as f64;
165 let jitter_range = base_millis * jitter_factor;
166 let jitter = (nanos % 1000) as f64 / 1000.0; let jitter_adjustment = (jitter - 0.5) * 2.0 * jitter_range; let final_millis = (base_millis + jitter_adjustment).max(0.0) as u64;
170 Duration::from_millis(final_millis)
171 }
172
173 pub fn reset(&mut self) {
175 self.current_delay = self.initial_delay;
176 self.attempt = 0;
177 }
178
179 pub fn attempt(&self) -> u32 {
181 self.attempt
182 }
183}
184
185impl RetryConfig {
186 pub fn to_backoff(&self) -> ExponentialBackoff {
188 ExponentialBackoff::new(self)
189 }
190}
191
192pub struct ReplicationConnectionRetry {
194 config: RetryConfig,
195 connection_string: String,
196}
197
198impl ReplicationConnectionRetry {
199 pub fn new(config: RetryConfig, connection_string: String) -> Self {
201 Self {
202 config,
203 connection_string,
204 }
205 }
206
207 pub async fn connect_with_retry(&self) -> Result<PgReplicationConnection> {
240 let start_time = Instant::now();
241 let mut backoff = self.config.to_backoff();
242
243 info!("Attempting to connect to PostgreSQL with retry logic");
244
245 for attempt in 1..=self.config.max_attempts {
246 debug!(
247 "Attempting PostgreSQL connection (attempt {}/{})",
248 attempt, self.config.max_attempts
249 );
250
251 if start_time.elapsed() >= self.config.max_duration {
253 error!(
254 "Connection attempts exceeded maximum duration ({:?})",
255 self.config.max_duration
256 );
257 return Err(ReplicationError::connection(format!(
258 "Connection attempts exceeded maximum duration of {:?}",
259 self.config.max_duration
260 )));
261 }
262
263 match PgReplicationConnection::connect(&self.connection_string) {
264 Ok(conn) => {
265 let elapsed = start_time.elapsed();
266 info!(
267 "Successfully connected to PostgreSQL on attempt {} after {:?}",
268 attempt, elapsed
269 );
270 return Ok(conn);
271 }
272 Err(e) => {
273 let error_msg = e.to_string();
274 error!("Connection attempt {} failed: {}", attempt, error_msg);
275
276 if attempt >= self.config.max_attempts {
278 let elapsed = start_time.elapsed();
279 error!("All connection attempts failed after {:?}", elapsed);
280 return Err(e);
281 }
282
283 let delay = backoff.next_delay();
285 debug!("Waiting {:?} before next attempt", delay);
286 tokio::time::sleep(delay).await;
287 }
288 }
289 }
290
291 let elapsed = start_time.elapsed();
293 error!("Connection failed after all attempts and {:?}", elapsed);
294 Err(ReplicationError::connection(
295 "Connection failed after all retry attempts".to_string(),
296 ))
297 }
298}
299
300#[cfg(test)]
301mod tests {
302 use super::*;
303
304 #[test]
305 fn test_exponential_backoff_basic() {
306 let config = RetryConfig {
307 max_attempts: 5,
308 initial_delay: Duration::from_millis(100),
309 max_delay: Duration::from_secs(60),
310 multiplier: 2.0,
311 max_duration: Duration::from_secs(300),
312 jitter: false,
313 };
314
315 let mut backoff = ExponentialBackoff::new(&config);
316
317 let first_delay = backoff.next_delay();
319 assert_eq!(first_delay, Duration::from_millis(100));
320 assert_eq!(backoff.attempt(), 1);
321
322 let second_delay = backoff.next_delay();
324 assert_eq!(second_delay, Duration::from_millis(200));
325 assert_eq!(backoff.attempt(), 2);
326
327 let third_delay = backoff.next_delay();
329 assert_eq!(third_delay, Duration::from_millis(400));
330 assert_eq!(backoff.attempt(), 3);
331 }
332
333 #[test]
334 fn test_exponential_backoff_max_delay() {
335 let config = RetryConfig {
336 max_attempts: 10,
337 initial_delay: Duration::from_millis(100),
338 max_delay: Duration::from_millis(500), multiplier: 2.0,
340 max_duration: Duration::from_secs(300),
341 jitter: false,
342 };
343
344 let mut backoff = ExponentialBackoff::new(&config);
345
346 backoff.next_delay(); backoff.next_delay(); backoff.next_delay(); let delay = backoff.next_delay(); assert!(delay <= Duration::from_millis(500));
354 }
355
356 #[test]
357 fn test_exponential_backoff_reset() {
358 let config = RetryConfig {
359 max_attempts: 5,
360 initial_delay: Duration::from_millis(100),
361 max_delay: Duration::from_secs(60),
362 multiplier: 2.0,
363 max_duration: Duration::from_secs(300),
364 jitter: false,
365 };
366
367 let mut backoff = ExponentialBackoff::new(&config);
368
369 backoff.next_delay();
371 backoff.next_delay();
372 assert_eq!(backoff.attempt(), 2);
373
374 backoff.reset();
376 assert_eq!(backoff.attempt(), 0);
377
378 let delay = backoff.next_delay();
379 assert_eq!(delay, Duration::from_millis(100));
380 assert_eq!(backoff.attempt(), 1);
381 }
382
383 #[test]
384 fn test_exponential_backoff_jitter() {
385 let config = RetryConfig {
386 max_attempts: 5,
387 initial_delay: Duration::from_millis(100),
388 max_delay: Duration::from_secs(60),
389 multiplier: 2.0,
390 max_duration: Duration::from_secs(300),
391 jitter: true,
392 };
393
394 let mut backoff = ExponentialBackoff::new(&config);
395
396 let delay = backoff.next_delay();
397 assert!(delay >= Duration::from_millis(70));
400 assert!(delay <= Duration::from_millis(130));
401 }
402
403 #[test]
404 fn test_max_attempts() {
405 let config = RetryConfig {
406 max_attempts: 3,
407 initial_delay: Duration::from_millis(100),
408 max_delay: Duration::from_secs(60),
409 multiplier: 2.0,
410 max_duration: Duration::from_secs(300),
411 jitter: false,
412 };
413
414 let mut backoff = ExponentialBackoff::new(&config);
415
416 backoff.next_delay();
417 assert_eq!(backoff.attempt(), 1);
418 backoff.next_delay();
419 assert_eq!(backoff.attempt(), 2);
420 backoff.next_delay();
421 assert_eq!(backoff.attempt(), 3);
422
423 assert_eq!(backoff.attempt(), config.max_attempts);
425 }
426
427 #[test]
428 fn test_multiplier_effect() {
429 let config = RetryConfig {
430 max_attempts: 5,
431 initial_delay: Duration::from_millis(10),
432 max_delay: Duration::from_secs(60),
433 multiplier: 3.0, max_duration: Duration::from_secs(300),
435 jitter: false,
436 };
437
438 let mut backoff = ExponentialBackoff::new(&config);
439
440 assert_eq!(backoff.next_delay(), Duration::from_millis(10));
441 assert_eq!(backoff.next_delay(), Duration::from_millis(30));
442 assert_eq!(backoff.next_delay(), Duration::from_millis(90));
443 }
444
445 #[test]
446 fn test_retry_config_default() {
447 let config = RetryConfig::default();
448 assert_eq!(config.max_attempts, 5);
449 assert_eq!(config.initial_delay, Duration::from_secs(1));
450 assert_eq!(config.max_delay, Duration::from_secs(60));
451 assert_eq!(config.multiplier, 2.0);
452 }
453
454 #[test]
455 fn test_backoff_current_delay_field() {
456 let config = RetryConfig {
457 max_attempts: 5,
458 initial_delay: Duration::from_millis(100),
459 max_delay: Duration::from_secs(60),
460 multiplier: 2.0,
461 max_duration: Duration::from_secs(300),
462 jitter: false,
463 };
464
465 let mut backoff = ExponentialBackoff::new(&config);
466
467 assert_eq!(backoff.current_delay, Duration::from_millis(100));
468 backoff.next_delay();
469 assert_eq!(backoff.current_delay, Duration::from_millis(200));
470 }
471
472 #[test]
473 fn test_reset_clears_state() {
474 let config = RetryConfig {
475 max_attempts: 5,
476 initial_delay: Duration::from_millis(100),
477 max_delay: Duration::from_secs(60),
478 multiplier: 2.0,
479 max_duration: Duration::from_secs(300),
480 jitter: false,
481 };
482
483 let mut backoff = ExponentialBackoff::new(&config);
484
485 backoff.next_delay();
486 backoff.next_delay();
487
488 backoff.reset();
489
490 assert_eq!(backoff.attempt(), 0);
491 assert_eq!(backoff.current_delay, Duration::from_millis(100));
492 }
493
494 #[test]
495 fn test_to_backoff() {
496 let config = RetryConfig::default();
497 let backoff = config.to_backoff();
498 assert_eq!(backoff.attempt(), 0);
499 assert_eq!(backoff.current_delay, config.initial_delay);
500 }
501}