pg_walstream/
retry.rs

1//! Connection retry logic with exponential backoff
2//!
3//! This module provides retry mechanisms for PostgreSQL replication connections,
4//! with configurable backoff strategies and error categorization.
5
6use crate::connection::PgReplicationConnection;
7use crate::error::{ReplicationError, Result};
8use std::time::{Duration, Instant};
9use tracing::{debug, error, info};
10
11/// Configuration for retry logic
12///
13/// Defines the parameters for exponential backoff retry behavior when
14/// connecting to PostgreSQL or recovering from transient failures.
15///
16/// # Example
17///
18/// ```
19/// use pg_walstream::RetryConfig;
20/// use std::time::Duration;
21///
22/// // Use default configuration (5 attempts, 1s to 60s delays)
23/// let config = RetryConfig::default();
24///
25/// // Or create custom configuration
26/// let custom_config = RetryConfig {
27///     max_attempts: 10,
28///     initial_delay: Duration::from_millis(500),
29///     max_delay: Duration::from_secs(30),
30///     multiplier: 2.0,
31///     max_duration: Duration::from_secs(600),
32///     jitter: true,
33/// };
34/// ```
35#[derive(Debug, Copy, Clone)]
36pub struct RetryConfig {
37    /// Maximum number of retry attempts before giving up
38    pub max_attempts: u32,
39    /// Initial delay before the first retry
40    pub initial_delay: Duration,
41    /// Maximum delay between retries (caps exponential growth)
42    pub max_delay: Duration,
43    /// Multiplier for exponential backoff (typically 2.0)
44    pub multiplier: f64,
45    /// Maximum total duration for all retry attempts
46    pub max_duration: Duration,
47    /// Whether to add random jitter to delays (reduces thundering herd)
48    pub jitter: bool,
49}
50
51impl Default for RetryConfig {
52    fn default() -> Self {
53        Self {
54            max_attempts: 5,
55            initial_delay: Duration::from_secs(1),
56            max_delay: Duration::from_secs(60),
57            multiplier: 2.0,
58            max_duration: Duration::from_secs(300),
59            jitter: true,
60        }
61    }
62}
63
64/// Custom exponential backoff implementation
65#[derive(Debug, Clone)]
66pub struct ExponentialBackoff {
67    initial_delay: Duration,
68    max_delay: Duration,
69    multiplier: f64,
70    jitter: bool,
71    current_delay: Duration,
72    attempt: u32,
73}
74
75impl ExponentialBackoff {
76    /// Create a new exponential backoff instance
77    ///
78    /// Initializes the backoff state machine with parameters from the provided
79    /// RetryConfig. The backoff starts at `initial_delay` and grows by `multiplier`
80    /// on each call to `next_delay()`, up to `max_delay`.
81    ///
82    /// # Arguments
83    ///
84    /// * `config` - Retry configuration specifying backoff parameters
85    ///
86    /// # Example
87    ///
88    /// ```
89    /// use pg_walstream::{ExponentialBackoff, RetryConfig};
90    ///
91    /// let config = RetryConfig::default();
92    /// let mut backoff = ExponentialBackoff::new(&config);
93    ///
94    /// let delay1 = backoff.next_delay();
95    /// let delay2 = backoff.next_delay();
96    /// assert!(delay2 > delay1); // Delay increases exponentially
97    /// ```
98    pub fn new(config: &RetryConfig) -> Self {
99        Self {
100            initial_delay: config.initial_delay,
101            max_delay: config.max_delay,
102            multiplier: config.multiplier,
103            jitter: config.jitter,
104            current_delay: config.initial_delay,
105            attempt: 0,
106        }
107    }
108
109    /// Get the next delay duration
110    ///
111    /// Returns the current delay and advances the internal state for the next call.
112    /// Each call increases the delay by the configured multiplier until it reaches
113    /// the maximum delay. Optional jitter is applied to prevent thundering herd problems.
114    ///
115    /// # Returns
116    ///
117    /// Duration to wait before the next retry attempt.
118    ///
119    /// # Example
120    ///
121    /// ```
122    /// use pg_walstream::{ExponentialBackoff, RetryConfig};
123    /// use std::time::Duration;
124    ///
125    /// let config = RetryConfig {
126    ///     max_attempts: 5,
127    ///     initial_delay: Duration::from_millis(100),
128    ///     max_delay: Duration::from_secs(10),
129    ///     multiplier: 2.0,
130    ///     max_duration: Duration::from_secs(60),
131    ///     jitter: false,
132    /// };
133    ///
134    /// let mut backoff = ExponentialBackoff::new(&config);
135    /// let d1 = backoff.next_delay(); // ~100ms
136    /// let d2 = backoff.next_delay(); // ~200ms
137    /// let d3 = backoff.next_delay(); // ~400ms
138    /// ```
139    pub fn next_delay(&mut self) -> Duration {
140        let delay = self.current_delay;
141
142        // Calculate next delay with exponential backoff
143        let next_delay_ms = (self.current_delay.as_millis() as f64 * self.multiplier) as u64;
144        self.current_delay = Duration::from_millis(next_delay_ms).min(self.max_delay);
145        self.attempt += 1;
146
147        // Add jitter if enabled
148        if self.jitter {
149            self.add_jitter(delay)
150        } else {
151            delay
152        }
153    }
154
155    /// Add jitter to the delay (±30% randomization)
156    fn add_jitter(&self, delay: Duration) -> Duration {
157        // Simple jitter implementation without external dependencies
158        // Use current time as a simple source of randomness
159        let now = Instant::now();
160        let nanos = now.elapsed().subsec_nanos();
161        let jitter_factor = 0.3;
162
163        // Calculate jitter as ±30% of the delay
164        let base_millis = delay.as_millis() as f64;
165        let jitter_range = base_millis * jitter_factor;
166        let jitter = (nanos % 1000) as f64 / 1000.0; // 0.0 to 1.0
167        let jitter_adjustment = (jitter - 0.5) * 2.0 * jitter_range; // -jitter_range to +jitter_range
168
169        let final_millis = (base_millis + jitter_adjustment).max(0.0) as u64;
170        Duration::from_millis(final_millis)
171    }
172
173    /// Reset the backoff to initial state
174    pub fn reset(&mut self) {
175        self.current_delay = self.initial_delay;
176        self.attempt = 0;
177    }
178
179    /// Get current attempt number
180    pub fn attempt(&self) -> u32 {
181        self.attempt
182    }
183}
184
185impl RetryConfig {
186    /// Create an exponential backoff policy from retry configuration
187    pub fn to_backoff(&self) -> ExponentialBackoff {
188        ExponentialBackoff::new(self)
189    }
190}
191
192/// Retry wrapper for PostgreSQL replication connection operations
193pub struct ReplicationConnectionRetry {
194    config: RetryConfig,
195    connection_string: String,
196}
197
198impl ReplicationConnectionRetry {
199    /// Create a new retry wrapper
200    pub fn new(config: RetryConfig, connection_string: String) -> Self {
201        Self {
202            config,
203            connection_string,
204        }
205    }
206
207    /// Retry connection establishment with exponential backoff
208    ///
209    /// Attempts to establish a PostgreSQL replication connection with automatic
210    /// retry logic. Uses exponential backoff with optional jitter to handle
211    /// transient connection failures gracefully.
212    ///
213    /// # Returns
214    ///
215    /// Returns a connected `PgReplicationConnection` on success.
216    ///
217    /// # Errors
218    ///
219    /// Returns an error if:
220    /// - All retry attempts are exhausted
221    /// - Maximum duration is exceeded
222    /// - A permanent error occurs (authentication, unsupported version, etc.)
223    ///
224    /// # Example
225    ///
226    /// ```no_run
227    /// use pg_walstream::{ReplicationConnectionRetry, RetryConfig};
228    ///
229    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
230    /// let retry_handler = ReplicationConnectionRetry::new(
231    ///     RetryConfig::default(),
232    ///     "postgresql://postgres:password@localhost/mydb?replication=database".to_string(),
233    /// );
234    ///
235    /// let connection = retry_handler.connect_with_retry().await?;
236    /// # Ok(())
237    /// # }
238    /// ```
239    pub async fn connect_with_retry(&self) -> Result<PgReplicationConnection> {
240        let start_time = Instant::now();
241        let mut backoff = self.config.to_backoff();
242
243        info!("Attempting to connect to PostgreSQL with retry logic");
244
245        for attempt in 1..=self.config.max_attempts {
246            debug!(
247                "Attempting PostgreSQL connection (attempt {}/{})",
248                attempt, self.config.max_attempts
249            );
250
251            // Check if we've exceeded the maximum duration
252            if start_time.elapsed() >= self.config.max_duration {
253                error!(
254                    "Connection attempts exceeded maximum duration ({:?})",
255                    self.config.max_duration
256                );
257                return Err(ReplicationError::connection(format!(
258                    "Connection attempts exceeded maximum duration of {:?}",
259                    self.config.max_duration
260                )));
261            }
262
263            match PgReplicationConnection::connect(&self.connection_string) {
264                Ok(conn) => {
265                    let elapsed = start_time.elapsed();
266                    info!(
267                        "Successfully connected to PostgreSQL on attempt {} after {:?}",
268                        attempt, elapsed
269                    );
270                    return Ok(conn);
271                }
272                Err(e) => {
273                    let error_msg = e.to_string();
274                    error!("Connection attempt {} failed: {}", attempt, error_msg);
275
276                    // If this is the last attempt, return the error
277                    if attempt >= self.config.max_attempts {
278                        let elapsed = start_time.elapsed();
279                        error!("All connection attempts failed after {:?}", elapsed);
280                        return Err(e);
281                    }
282
283                    // Wait before the next attempt
284                    let delay = backoff.next_delay();
285                    debug!("Waiting {:?} before next attempt", delay);
286                    tokio::time::sleep(delay).await;
287                }
288            }
289        }
290
291        // This should never be reached due to the loop logic above
292        let elapsed = start_time.elapsed();
293        error!("Connection failed after all attempts and {:?}", elapsed);
294        Err(ReplicationError::connection(
295            "Connection failed after all retry attempts".to_string(),
296        ))
297    }
298}
299
300#[cfg(test)]
301mod tests {
302    use super::*;
303
304    #[test]
305    fn test_exponential_backoff_basic() {
306        let config = RetryConfig {
307            max_attempts: 5,
308            initial_delay: Duration::from_millis(100),
309            max_delay: Duration::from_secs(60),
310            multiplier: 2.0,
311            max_duration: Duration::from_secs(300),
312            jitter: false,
313        };
314
315        let mut backoff = ExponentialBackoff::new(&config);
316
317        // First delay should be the initial delay
318        let first_delay = backoff.next_delay();
319        assert_eq!(first_delay, Duration::from_millis(100));
320        assert_eq!(backoff.attempt(), 1);
321
322        // Second delay should be doubled
323        let second_delay = backoff.next_delay();
324        assert_eq!(second_delay, Duration::from_millis(200));
325        assert_eq!(backoff.attempt(), 2);
326
327        // Third delay should be doubled again
328        let third_delay = backoff.next_delay();
329        assert_eq!(third_delay, Duration::from_millis(400));
330        assert_eq!(backoff.attempt(), 3);
331    }
332
333    #[test]
334    fn test_exponential_backoff_max_delay() {
335        let config = RetryConfig {
336            max_attempts: 10,
337            initial_delay: Duration::from_millis(100),
338            max_delay: Duration::from_millis(500), // Low max delay for testing
339            multiplier: 2.0,
340            max_duration: Duration::from_secs(300),
341            jitter: false,
342        };
343
344        let mut backoff = ExponentialBackoff::new(&config);
345
346        // Skip to a point where we should hit the max delay
347        backoff.next_delay(); // 100ms
348        backoff.next_delay(); // 200ms
349        backoff.next_delay(); // 400ms
350        let delay = backoff.next_delay(); // Should be capped at 500ms, not 800ms
351
352        // The current delay should be capped at max_delay
353        assert!(delay <= Duration::from_millis(500));
354    }
355
356    #[test]
357    fn test_exponential_backoff_reset() {
358        let config = RetryConfig {
359            max_attempts: 5,
360            initial_delay: Duration::from_millis(100),
361            max_delay: Duration::from_secs(60),
362            multiplier: 2.0,
363            max_duration: Duration::from_secs(300),
364            jitter: false,
365        };
366
367        let mut backoff = ExponentialBackoff::new(&config);
368
369        // Make a few attempts
370        backoff.next_delay();
371        backoff.next_delay();
372        assert_eq!(backoff.attempt(), 2);
373
374        // Reset should go back to initial state
375        backoff.reset();
376        assert_eq!(backoff.attempt(), 0);
377
378        let delay = backoff.next_delay();
379        assert_eq!(delay, Duration::from_millis(100));
380        assert_eq!(backoff.attempt(), 1);
381    }
382
383    #[test]
384    fn test_exponential_backoff_jitter() {
385        let config = RetryConfig {
386            max_attempts: 5,
387            initial_delay: Duration::from_millis(100),
388            max_delay: Duration::from_secs(60),
389            multiplier: 2.0,
390            max_duration: Duration::from_secs(300),
391            jitter: true,
392        };
393
394        let mut backoff = ExponentialBackoff::new(&config);
395
396        let delay = backoff.next_delay();
397        // With jitter, the delay should be around 100ms but not exactly 100ms
398        // Allow for ±30% jitter range
399        assert!(delay >= Duration::from_millis(70));
400        assert!(delay <= Duration::from_millis(130));
401    }
402
403    #[test]
404    fn test_max_attempts() {
405        let config = RetryConfig {
406            max_attempts: 3,
407            initial_delay: Duration::from_millis(100),
408            max_delay: Duration::from_secs(60),
409            multiplier: 2.0,
410            max_duration: Duration::from_secs(300),
411            jitter: false,
412        };
413
414        let mut backoff = ExponentialBackoff::new(&config);
415
416        backoff.next_delay();
417        assert_eq!(backoff.attempt(), 1);
418        backoff.next_delay();
419        assert_eq!(backoff.attempt(), 2);
420        backoff.next_delay();
421        assert_eq!(backoff.attempt(), 3);
422
423        // After 3 attempts, we've reached max_attempts
424        assert_eq!(backoff.attempt(), config.max_attempts);
425    }
426
427    #[test]
428    fn test_multiplier_effect() {
429        let config = RetryConfig {
430            max_attempts: 5,
431            initial_delay: Duration::from_millis(10),
432            max_delay: Duration::from_secs(60),
433            multiplier: 3.0, // 3x multiplier
434            max_duration: Duration::from_secs(300),
435            jitter: false,
436        };
437
438        let mut backoff = ExponentialBackoff::new(&config);
439
440        assert_eq!(backoff.next_delay(), Duration::from_millis(10));
441        assert_eq!(backoff.next_delay(), Duration::from_millis(30));
442        assert_eq!(backoff.next_delay(), Duration::from_millis(90));
443    }
444
445    #[test]
446    fn test_retry_config_default() {
447        let config = RetryConfig::default();
448        assert_eq!(config.max_attempts, 5);
449        assert_eq!(config.initial_delay, Duration::from_secs(1));
450        assert_eq!(config.max_delay, Duration::from_secs(60));
451        assert_eq!(config.multiplier, 2.0);
452    }
453
454    #[test]
455    fn test_backoff_current_delay_field() {
456        let config = RetryConfig {
457            max_attempts: 5,
458            initial_delay: Duration::from_millis(100),
459            max_delay: Duration::from_secs(60),
460            multiplier: 2.0,
461            max_duration: Duration::from_secs(300),
462            jitter: false,
463        };
464
465        let mut backoff = ExponentialBackoff::new(&config);
466
467        assert_eq!(backoff.current_delay, Duration::from_millis(100));
468        backoff.next_delay();
469        assert_eq!(backoff.current_delay, Duration::from_millis(200));
470    }
471
472    #[test]
473    fn test_reset_clears_state() {
474        let config = RetryConfig {
475            max_attempts: 5,
476            initial_delay: Duration::from_millis(100),
477            max_delay: Duration::from_secs(60),
478            multiplier: 2.0,
479            max_duration: Duration::from_secs(300),
480            jitter: false,
481        };
482
483        let mut backoff = ExponentialBackoff::new(&config);
484
485        backoff.next_delay();
486        backoff.next_delay();
487
488        backoff.reset();
489
490        assert_eq!(backoff.attempt(), 0);
491        assert_eq!(backoff.current_delay, Duration::from_millis(100));
492    }
493
494    #[test]
495    fn test_to_backoff() {
496        let config = RetryConfig::default();
497        let backoff = config.to_backoff();
498        assert_eq!(backoff.attempt(), 0);
499        assert_eq!(backoff.current_delay, config.initial_delay);
500    }
501}