Skip to main content

noxu_rep/stream/
reconnect.rs

1//! Replica-side reconnection with exponential backoff.
2//!
3//! When a network partition or master crash disconnects the replica's
4//! replication stream, the [`catch_up_with_retry`] function wraps the
5//! existing `catch_up_from_peer` call in a retry loop with configurable
6//! exponential backoff and jitter.
7
8use std::net::SocketAddr;
9use std::sync::Arc;
10use std::sync::atomic::{AtomicBool, Ordering};
11use std::time::Duration;
12
13use crate::error::RepError;
14use crate::stream::peer_feeder::catch_up_from_peer;
15use crate::stream::replica_stream::LogWriter;
16
17/// Configuration for replica reconnection backoff.
18#[derive(Debug, Clone)]
19pub struct ReconnectConfig {
20    /// Initial backoff duration in milliseconds (default: 100).
21    pub initial_backoff_ms: u64,
22    /// Maximum backoff duration in milliseconds (default: 30_000).
23    pub max_backoff_ms: u64,
24    /// Multiplicative backoff factor (default: 2.0).
25    pub backoff_factor: f64,
26    /// Maximum number of retry attempts. 0 means unlimited (default: 0).
27    pub max_retries: u32,
28    /// Fraction of the current backoff to use as jitter range (default: 0.25).
29    ///
30    /// The actual sleep is `backoff +/- (backoff * jitter_fraction / 2)`.
31    pub jitter_fraction: f64,
32}
33
34impl Default for ReconnectConfig {
35    fn default() -> Self {
36        Self {
37            initial_backoff_ms: 100,
38            max_backoff_ms: 30_000,
39            backoff_factor: 2.0,
40            max_retries: 0,
41            jitter_fraction: 0.25,
42        }
43    }
44}
45
46impl ReconnectConfig {
47    /// Calculate the backoff duration for the given attempt number (0-indexed).
48    ///
49    /// Applies exponential growth capped at `max_backoff_ms`, then adds
50    /// symmetric jitter within `+/- jitter_fraction/2` of the base value.
51    pub fn next_backoff(&self, attempt: u32) -> Duration {
52        let base = (self.initial_backoff_ms as f64)
53            * self.backoff_factor.powi(attempt as i32);
54        let capped = base.min(self.max_backoff_ms as f64);
55
56        // Deterministic jitter using a simple hash of the attempt number.
57        // Not cryptographic, but sufficient for desynchronizing retries.
58        let jitter_seed = attempt.wrapping_mul(2654435761); // Knuth multiplicative hash
59        let jitter_norm = (jitter_seed % 1000) as f64 / 1000.0; // 0.0..1.0
60        let jitter_range = capped * self.jitter_fraction;
61        let jitter = (jitter_norm - 0.5) * jitter_range;
62
63        let ms = (capped + jitter).max(1.0) as u64;
64        Duration::from_millis(ms)
65    }
66}
67
68/// Outcome of a `catch_up_with_retry` call.
69#[derive(Debug, Clone, Copy, PartialEq, Eq)]
70pub enum ReconnectOutcome {
71    /// Successfully caught up from the peer.
72    CaughtUp,
73    /// Peer indicated a full restore is required (VLSN too old).
74    NeedsRestore,
75    /// Maximum retries exceeded without a successful connection.
76    MaxRetriesExceeded,
77    /// Shutdown was signalled before a successful connection.
78    Shutdown,
79}
80
81/// Retry wrapper around [`catch_up_from_peer`].
82///
83/// Calls `catch_up_from_peer` in a loop. On connection failure, sleeps
84/// according to [`ReconnectConfig`] backoff and retries. The loop exits when:
85///
86/// - The catch-up succeeds (`Ok(true)`) — returns [`ReconnectOutcome::CaughtUp`].
87/// - The peer requires a full restore (`Ok(false)`) — returns [`ReconnectOutcome::NeedsRestore`].
88/// - `max_retries` is exceeded (and non-zero) — returns [`ReconnectOutcome::MaxRetriesExceeded`].
89/// - `shutdown` flag is set — returns [`ReconnectOutcome::Shutdown`].
90pub fn catch_up_with_retry(
91    peer_addr: SocketAddr,
92    start_vlsn: u64,
93    log_writer: &mut dyn LogWriter,
94    config: &ReconnectConfig,
95    shutdown: &Arc<AtomicBool>,
96) -> ReconnectOutcome {
97    let mut attempt: u32 = 0;
98
99    loop {
100        // Check shutdown before each attempt.
101        if shutdown.load(Ordering::Acquire) {
102            log::info!(
103                "reconnect: shutdown signalled before attempt {}; exiting",
104                attempt
105            );
106            return ReconnectOutcome::Shutdown;
107        }
108
109        match catch_up_from_peer(peer_addr, start_vlsn, log_writer) {
110            Ok(true) => {
111                if attempt > 0 {
112                    log::info!(
113                        "reconnect: successfully caught up from {} after {} retries",
114                        peer_addr,
115                        attempt
116                    );
117                }
118                return ReconnectOutcome::CaughtUp;
119            }
120            Ok(false) => {
121                log::warn!(
122                    "reconnect: peer {} requires full restore (VLSN {} too old)",
123                    peer_addr,
124                    start_vlsn
125                );
126                return ReconnectOutcome::NeedsRestore;
127            }
128            Err(e) => {
129                // Check if this is a non-retryable error.
130                if !is_retryable(&e) {
131                    log::error!(
132                        "reconnect: non-retryable error from {}: {}",
133                        peer_addr,
134                        e
135                    );
136                    // Treat non-retryable errors like max retries exceeded.
137                    return ReconnectOutcome::MaxRetriesExceeded;
138                }
139
140                // Check max_retries limit.
141                if config.max_retries > 0 && attempt >= config.max_retries {
142                    log::warn!(
143                        "reconnect: max retries ({}) exceeded for {}; last error: {}",
144                        config.max_retries,
145                        peer_addr,
146                        e
147                    );
148                    return ReconnectOutcome::MaxRetriesExceeded;
149                }
150
151                let backoff = config.next_backoff(attempt);
152                log::warn!(
153                    "reconnect: attempt {} to {} failed ({}); retrying in {:?}",
154                    attempt,
155                    peer_addr,
156                    e,
157                    backoff
158                );
159
160                // Sleep in small increments to allow shutdown detection.
161                let sleep_end = std::time::Instant::now() + backoff;
162                while std::time::Instant::now() < sleep_end {
163                    if shutdown.load(Ordering::Acquire) {
164                        log::info!(
165                            "reconnect: shutdown signalled during backoff"
166                        );
167                        return ReconnectOutcome::Shutdown;
168                    }
169                    let remaining = sleep_end
170                        .saturating_duration_since(std::time::Instant::now());
171                    std::thread::sleep(
172                        remaining.min(Duration::from_millis(100)),
173                    );
174                }
175
176                attempt = attempt.saturating_add(1);
177            }
178        }
179    }
180}
181
182/// Determine whether an error is retryable (transient network issue).
183fn is_retryable(err: &RepError) -> bool {
184    matches!(
185        err,
186        RepError::NetworkError(_)
187            | RepError::ChannelClosed(_)
188            | RepError::FrameCorrupted { .. }
189    )
190}
191
192#[cfg(test)]
193mod tests {
194    use super::*;
195    use crate::error::Result;
196
197    // -----------------------------------------------------------------------
198    // ReconnectConfig tests
199    // -----------------------------------------------------------------------
200
201    #[test]
202    fn test_default_config() {
203        let cfg = ReconnectConfig::default();
204        assert_eq!(cfg.initial_backoff_ms, 100);
205        assert_eq!(cfg.max_backoff_ms, 30_000);
206        assert_eq!(cfg.backoff_factor, 2.0);
207        assert_eq!(cfg.max_retries, 0);
208        assert_eq!(cfg.jitter_fraction, 0.25);
209    }
210
211    #[test]
212    fn test_backoff_exponential_growth() {
213        let cfg = ReconnectConfig {
214            initial_backoff_ms: 100,
215            max_backoff_ms: 60_000,
216            backoff_factor: 2.0,
217            max_retries: 0,
218            jitter_fraction: 0.0, // no jitter for predictable test
219        };
220
221        // With zero jitter, backoff should be exactly exponential.
222        let b0 = cfg.next_backoff(0);
223        let b1 = cfg.next_backoff(1);
224        let b2 = cfg.next_backoff(2);
225        let b3 = cfg.next_backoff(3);
226
227        assert_eq!(b0.as_millis(), 100);
228        assert_eq!(b1.as_millis(), 200);
229        assert_eq!(b2.as_millis(), 400);
230        assert_eq!(b3.as_millis(), 800);
231    }
232
233    #[test]
234    fn test_backoff_capped_at_max() {
235        let cfg = ReconnectConfig {
236            initial_backoff_ms: 1000,
237            max_backoff_ms: 5000,
238            backoff_factor: 3.0,
239            max_retries: 0,
240            jitter_fraction: 0.0,
241        };
242
243        // attempt 0: 1000, attempt 1: 3000, attempt 2: 9000 -> capped to 5000
244        assert_eq!(cfg.next_backoff(0).as_millis(), 1000);
245        assert_eq!(cfg.next_backoff(1).as_millis(), 3000);
246        assert_eq!(cfg.next_backoff(2).as_millis(), 5000);
247        assert_eq!(cfg.next_backoff(3).as_millis(), 5000);
248    }
249
250    #[test]
251    fn test_backoff_with_jitter_bounded() {
252        let cfg = ReconnectConfig {
253            initial_backoff_ms: 1000,
254            max_backoff_ms: 60_000,
255            backoff_factor: 2.0,
256            max_retries: 0,
257            jitter_fraction: 0.5,
258        };
259
260        // With 50% jitter, backoff(0) should be in [750, 1250]
261        let b = cfg.next_backoff(0).as_millis();
262        assert!(b >= 750, "backoff {} < 750", b);
263        assert!(b <= 1250, "backoff {} > 1250", b);
264    }
265
266    #[test]
267    fn test_backoff_never_zero() {
268        let cfg = ReconnectConfig {
269            initial_backoff_ms: 1,
270            max_backoff_ms: 1,
271            backoff_factor: 1.0,
272            max_retries: 0,
273            jitter_fraction: 1.0, // maximum jitter
274        };
275
276        // Even with extreme jitter the backoff should be at least 1ms.
277        for attempt in 0..20 {
278            let b = cfg.next_backoff(attempt);
279            assert!(b.as_millis() >= 1);
280        }
281    }
282
283    // -----------------------------------------------------------------------
284    // catch_up_with_retry tests (using mock channel infrastructure)
285    // -----------------------------------------------------------------------
286
287    #[test]
288    fn test_shutdown_before_first_attempt() {
289        struct NeverWriter;
290        impl LogWriter for NeverWriter {
291            fn write_entry(&mut self, _: u64, _: u8, _: &[u8]) -> Result<()> {
292                panic!("should not be called");
293            }
294        }
295
296        let shutdown = Arc::new(AtomicBool::new(true));
297        let cfg = ReconnectConfig::default();
298        let addr: SocketAddr = "127.0.0.1:1".parse().unwrap();
299
300        let outcome =
301            catch_up_with_retry(addr, 0, &mut NeverWriter, &cfg, &shutdown);
302        assert_eq!(outcome, ReconnectOutcome::Shutdown);
303    }
304
305    #[test]
306    fn test_max_retries_exceeded() {
307        struct NeverWriter;
308        impl LogWriter for NeverWriter {
309            fn write_entry(&mut self, _: u64, _: u8, _: &[u8]) -> Result<()> {
310                Ok(())
311            }
312        }
313
314        let shutdown = Arc::new(AtomicBool::new(false));
315        let cfg = ReconnectConfig {
316            initial_backoff_ms: 1,
317            max_backoff_ms: 1,
318            backoff_factor: 1.0,
319            max_retries: 2,
320            jitter_fraction: 0.0,
321        };
322        // Use an address that will fail to connect (nothing listening on port 1).
323        let addr: SocketAddr = "127.0.0.1:1".parse().unwrap();
324
325        let outcome =
326            catch_up_with_retry(addr, 0, &mut NeverWriter, &cfg, &shutdown);
327        assert_eq!(outcome, ReconnectOutcome::MaxRetriesExceeded);
328    }
329
330    #[test]
331    fn test_is_retryable_network_error() {
332        assert!(is_retryable(&RepError::NetworkError("timeout".into())));
333        assert!(is_retryable(&RepError::ChannelClosed("gone".into())));
334        assert!(is_retryable(&RepError::FrameCorrupted {
335            vlsn: 1,
336            expected: 0,
337            actual: 1,
338        }));
339        assert!(!is_retryable(&RepError::ProtocolError("bad".into())));
340        assert!(!is_retryable(&RepError::DatabaseError("disk".into())));
341    }
342}