1use std::net::SocketAddr;
9use std::sync::Arc;
10use std::sync::atomic::{AtomicBool, Ordering};
11use std::time::Duration;
12
13use crate::error::RepError;
14use crate::stream::peer_feeder::catch_up_from_peer;
15use crate::stream::replica_stream::LogWriter;
16
17#[derive(Debug, Clone)]
19pub struct ReconnectConfig {
20 pub initial_backoff_ms: u64,
22 pub max_backoff_ms: u64,
24 pub backoff_factor: f64,
26 pub max_retries: u32,
28 pub jitter_fraction: f64,
32}
33
34impl Default for ReconnectConfig {
35 fn default() -> Self {
36 Self {
37 initial_backoff_ms: 100,
38 max_backoff_ms: 30_000,
39 backoff_factor: 2.0,
40 max_retries: 0,
41 jitter_fraction: 0.25,
42 }
43 }
44}
45
46impl ReconnectConfig {
47 pub fn next_backoff(&self, attempt: u32) -> Duration {
52 let base = (self.initial_backoff_ms as f64)
53 * self.backoff_factor.powi(attempt as i32);
54 let capped = base.min(self.max_backoff_ms as f64);
55
56 let jitter_seed = attempt.wrapping_mul(2654435761); let jitter_norm = (jitter_seed % 1000) as f64 / 1000.0; let jitter_range = capped * self.jitter_fraction;
61 let jitter = (jitter_norm - 0.5) * jitter_range;
62
63 let ms = (capped + jitter).max(1.0) as u64;
64 Duration::from_millis(ms)
65 }
66}
67
68#[derive(Debug, Clone, Copy, PartialEq, Eq)]
70pub enum ReconnectOutcome {
71 CaughtUp,
73 NeedsRestore,
75 MaxRetriesExceeded,
77 Shutdown,
79}
80
81pub fn catch_up_with_retry(
91 peer_addr: SocketAddr,
92 start_vlsn: u64,
93 log_writer: &mut dyn LogWriter,
94 config: &ReconnectConfig,
95 shutdown: &Arc<AtomicBool>,
96) -> ReconnectOutcome {
97 let mut attempt: u32 = 0;
98
99 loop {
100 if shutdown.load(Ordering::Acquire) {
102 log::info!(
103 "reconnect: shutdown signalled before attempt {}; exiting",
104 attempt
105 );
106 return ReconnectOutcome::Shutdown;
107 }
108
109 match catch_up_from_peer(peer_addr, start_vlsn, log_writer) {
110 Ok(true) => {
111 if attempt > 0 {
112 log::info!(
113 "reconnect: successfully caught up from {} after {} retries",
114 peer_addr,
115 attempt
116 );
117 }
118 return ReconnectOutcome::CaughtUp;
119 }
120 Ok(false) => {
121 log::warn!(
122 "reconnect: peer {} requires full restore (VLSN {} too old)",
123 peer_addr,
124 start_vlsn
125 );
126 return ReconnectOutcome::NeedsRestore;
127 }
128 Err(e) => {
129 if !is_retryable(&e) {
131 log::error!(
132 "reconnect: non-retryable error from {}: {}",
133 peer_addr,
134 e
135 );
136 return ReconnectOutcome::MaxRetriesExceeded;
138 }
139
140 if config.max_retries > 0 && attempt >= config.max_retries {
142 log::warn!(
143 "reconnect: max retries ({}) exceeded for {}; last error: {}",
144 config.max_retries,
145 peer_addr,
146 e
147 );
148 return ReconnectOutcome::MaxRetriesExceeded;
149 }
150
151 let backoff = config.next_backoff(attempt);
152 log::warn!(
153 "reconnect: attempt {} to {} failed ({}); retrying in {:?}",
154 attempt,
155 peer_addr,
156 e,
157 backoff
158 );
159
160 let sleep_end = std::time::Instant::now() + backoff;
162 while std::time::Instant::now() < sleep_end {
163 if shutdown.load(Ordering::Acquire) {
164 log::info!(
165 "reconnect: shutdown signalled during backoff"
166 );
167 return ReconnectOutcome::Shutdown;
168 }
169 let remaining = sleep_end
170 .saturating_duration_since(std::time::Instant::now());
171 std::thread::sleep(
172 remaining.min(Duration::from_millis(100)),
173 );
174 }
175
176 attempt = attempt.saturating_add(1);
177 }
178 }
179 }
180}
181
182fn is_retryable(err: &RepError) -> bool {
184 matches!(
185 err,
186 RepError::NetworkError(_)
187 | RepError::ChannelClosed(_)
188 | RepError::FrameCorrupted { .. }
189 )
190}
191
192#[cfg(test)]
193mod tests {
194 use super::*;
195 use crate::error::Result;
196
197 #[test]
202 fn test_default_config() {
203 let cfg = ReconnectConfig::default();
204 assert_eq!(cfg.initial_backoff_ms, 100);
205 assert_eq!(cfg.max_backoff_ms, 30_000);
206 assert_eq!(cfg.backoff_factor, 2.0);
207 assert_eq!(cfg.max_retries, 0);
208 assert_eq!(cfg.jitter_fraction, 0.25);
209 }
210
211 #[test]
212 fn test_backoff_exponential_growth() {
213 let cfg = ReconnectConfig {
214 initial_backoff_ms: 100,
215 max_backoff_ms: 60_000,
216 backoff_factor: 2.0,
217 max_retries: 0,
218 jitter_fraction: 0.0, };
220
221 let b0 = cfg.next_backoff(0);
223 let b1 = cfg.next_backoff(1);
224 let b2 = cfg.next_backoff(2);
225 let b3 = cfg.next_backoff(3);
226
227 assert_eq!(b0.as_millis(), 100);
228 assert_eq!(b1.as_millis(), 200);
229 assert_eq!(b2.as_millis(), 400);
230 assert_eq!(b3.as_millis(), 800);
231 }
232
233 #[test]
234 fn test_backoff_capped_at_max() {
235 let cfg = ReconnectConfig {
236 initial_backoff_ms: 1000,
237 max_backoff_ms: 5000,
238 backoff_factor: 3.0,
239 max_retries: 0,
240 jitter_fraction: 0.0,
241 };
242
243 assert_eq!(cfg.next_backoff(0).as_millis(), 1000);
245 assert_eq!(cfg.next_backoff(1).as_millis(), 3000);
246 assert_eq!(cfg.next_backoff(2).as_millis(), 5000);
247 assert_eq!(cfg.next_backoff(3).as_millis(), 5000);
248 }
249
250 #[test]
251 fn test_backoff_with_jitter_bounded() {
252 let cfg = ReconnectConfig {
253 initial_backoff_ms: 1000,
254 max_backoff_ms: 60_000,
255 backoff_factor: 2.0,
256 max_retries: 0,
257 jitter_fraction: 0.5,
258 };
259
260 let b = cfg.next_backoff(0).as_millis();
262 assert!(b >= 750, "backoff {} < 750", b);
263 assert!(b <= 1250, "backoff {} > 1250", b);
264 }
265
266 #[test]
267 fn test_backoff_never_zero() {
268 let cfg = ReconnectConfig {
269 initial_backoff_ms: 1,
270 max_backoff_ms: 1,
271 backoff_factor: 1.0,
272 max_retries: 0,
273 jitter_fraction: 1.0, };
275
276 for attempt in 0..20 {
278 let b = cfg.next_backoff(attempt);
279 assert!(b.as_millis() >= 1);
280 }
281 }
282
283 #[test]
288 fn test_shutdown_before_first_attempt() {
289 struct NeverWriter;
290 impl LogWriter for NeverWriter {
291 fn write_entry(&mut self, _: u64, _: u8, _: &[u8]) -> Result<()> {
292 panic!("should not be called");
293 }
294 }
295
296 let shutdown = Arc::new(AtomicBool::new(true));
297 let cfg = ReconnectConfig::default();
298 let addr: SocketAddr = "127.0.0.1:1".parse().unwrap();
299
300 let outcome =
301 catch_up_with_retry(addr, 0, &mut NeverWriter, &cfg, &shutdown);
302 assert_eq!(outcome, ReconnectOutcome::Shutdown);
303 }
304
305 #[test]
306 fn test_max_retries_exceeded() {
307 struct NeverWriter;
308 impl LogWriter for NeverWriter {
309 fn write_entry(&mut self, _: u64, _: u8, _: &[u8]) -> Result<()> {
310 Ok(())
311 }
312 }
313
314 let shutdown = Arc::new(AtomicBool::new(false));
315 let cfg = ReconnectConfig {
316 initial_backoff_ms: 1,
317 max_backoff_ms: 1,
318 backoff_factor: 1.0,
319 max_retries: 2,
320 jitter_fraction: 0.0,
321 };
322 let addr: SocketAddr = "127.0.0.1:1".parse().unwrap();
324
325 let outcome =
326 catch_up_with_retry(addr, 0, &mut NeverWriter, &cfg, &shutdown);
327 assert_eq!(outcome, ReconnectOutcome::MaxRetriesExceeded);
328 }
329
330 #[test]
331 fn test_is_retryable_network_error() {
332 assert!(is_retryable(&RepError::NetworkError("timeout".into())));
333 assert!(is_retryable(&RepError::ChannelClosed("gone".into())));
334 assert!(is_retryable(&RepError::FrameCorrupted {
335 vlsn: 1,
336 expected: 0,
337 actual: 1,
338 }));
339 assert!(!is_retryable(&RepError::ProtocolError("bad".into())));
340 assert!(!is_retryable(&RepError::DatabaseError("disk".into())));
341 }
342}