use std::net::SocketAddr;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::time::Duration;
use crate::error::RepError;
use crate::stream::peer_feeder::catch_up_from_peer;
use crate::stream::replica_stream::LogWriter;
#[derive(Debug, Clone)]
pub struct ReconnectConfig {
pub initial_backoff_ms: u64,
pub max_backoff_ms: u64,
pub backoff_factor: f64,
pub max_retries: u32,
pub jitter_fraction: f64,
}
impl Default for ReconnectConfig {
fn default() -> Self {
Self {
initial_backoff_ms: 100,
max_backoff_ms: 30_000,
backoff_factor: 2.0,
max_retries: 0,
jitter_fraction: 0.25,
}
}
}
impl ReconnectConfig {
pub fn next_backoff(&self, attempt: u32) -> Duration {
let base = (self.initial_backoff_ms as f64)
* self.backoff_factor.powi(attempt as i32);
let capped = base.min(self.max_backoff_ms as f64);
let jitter_seed = attempt.wrapping_mul(2654435761); let jitter_norm = (jitter_seed % 1000) as f64 / 1000.0; let jitter_range = capped * self.jitter_fraction;
let jitter = (jitter_norm - 0.5) * jitter_range;
let ms = (capped + jitter).max(1.0) as u64;
Duration::from_millis(ms)
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ReconnectOutcome {
CaughtUp,
NeedsRestore,
MaxRetriesExceeded,
Shutdown,
}
pub fn catch_up_with_retry(
peer_addr: SocketAddr,
start_vlsn: u64,
log_writer: &mut dyn LogWriter,
config: &ReconnectConfig,
shutdown: &Arc<AtomicBool>,
) -> ReconnectOutcome {
let mut attempt: u32 = 0;
loop {
if shutdown.load(Ordering::Acquire) {
log::info!(
"reconnect: shutdown signalled before attempt {}; exiting",
attempt
);
return ReconnectOutcome::Shutdown;
}
match catch_up_from_peer(peer_addr, start_vlsn, log_writer) {
Ok(true) => {
if attempt > 0 {
log::info!(
"reconnect: successfully caught up from {} after {} retries",
peer_addr,
attempt
);
}
return ReconnectOutcome::CaughtUp;
}
Ok(false) => {
log::warn!(
"reconnect: peer {} requires full restore (VLSN {} too old)",
peer_addr,
start_vlsn
);
return ReconnectOutcome::NeedsRestore;
}
Err(e) => {
if !is_retryable(&e) {
log::error!(
"reconnect: non-retryable error from {}: {}",
peer_addr,
e
);
return ReconnectOutcome::MaxRetriesExceeded;
}
if config.max_retries > 0 && attempt >= config.max_retries {
log::warn!(
"reconnect: max retries ({}) exceeded for {}; last error: {}",
config.max_retries,
peer_addr,
e
);
return ReconnectOutcome::MaxRetriesExceeded;
}
let backoff = config.next_backoff(attempt);
log::warn!(
"reconnect: attempt {} to {} failed ({}); retrying in {:?}",
attempt,
peer_addr,
e,
backoff
);
let sleep_end = std::time::Instant::now() + backoff;
while std::time::Instant::now() < sleep_end {
if shutdown.load(Ordering::Acquire) {
log::info!(
"reconnect: shutdown signalled during backoff"
);
return ReconnectOutcome::Shutdown;
}
let remaining = sleep_end
.saturating_duration_since(std::time::Instant::now());
std::thread::sleep(
remaining.min(Duration::from_millis(100)),
);
}
attempt = attempt.saturating_add(1);
}
}
}
}
fn is_retryable(err: &RepError) -> bool {
matches!(
err,
RepError::NetworkError(_)
| RepError::ChannelClosed(_)
| RepError::FrameCorrupted { .. }
)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::error::Result;
#[test]
fn test_default_config() {
let cfg = ReconnectConfig::default();
assert_eq!(cfg.initial_backoff_ms, 100);
assert_eq!(cfg.max_backoff_ms, 30_000);
assert_eq!(cfg.backoff_factor, 2.0);
assert_eq!(cfg.max_retries, 0);
assert_eq!(cfg.jitter_fraction, 0.25);
}
#[test]
fn test_backoff_exponential_growth() {
let cfg = ReconnectConfig {
initial_backoff_ms: 100,
max_backoff_ms: 60_000,
backoff_factor: 2.0,
max_retries: 0,
jitter_fraction: 0.0, };
let b0 = cfg.next_backoff(0);
let b1 = cfg.next_backoff(1);
let b2 = cfg.next_backoff(2);
let b3 = cfg.next_backoff(3);
assert_eq!(b0.as_millis(), 100);
assert_eq!(b1.as_millis(), 200);
assert_eq!(b2.as_millis(), 400);
assert_eq!(b3.as_millis(), 800);
}
#[test]
fn test_backoff_capped_at_max() {
let cfg = ReconnectConfig {
initial_backoff_ms: 1000,
max_backoff_ms: 5000,
backoff_factor: 3.0,
max_retries: 0,
jitter_fraction: 0.0,
};
assert_eq!(cfg.next_backoff(0).as_millis(), 1000);
assert_eq!(cfg.next_backoff(1).as_millis(), 3000);
assert_eq!(cfg.next_backoff(2).as_millis(), 5000);
assert_eq!(cfg.next_backoff(3).as_millis(), 5000);
}
#[test]
fn test_backoff_with_jitter_bounded() {
let cfg = ReconnectConfig {
initial_backoff_ms: 1000,
max_backoff_ms: 60_000,
backoff_factor: 2.0,
max_retries: 0,
jitter_fraction: 0.5,
};
let b = cfg.next_backoff(0).as_millis();
assert!(b >= 750, "backoff {} < 750", b);
assert!(b <= 1250, "backoff {} > 1250", b);
}
#[test]
fn test_backoff_never_zero() {
let cfg = ReconnectConfig {
initial_backoff_ms: 1,
max_backoff_ms: 1,
backoff_factor: 1.0,
max_retries: 0,
jitter_fraction: 1.0, };
for attempt in 0..20 {
let b = cfg.next_backoff(attempt);
assert!(b.as_millis() >= 1);
}
}
#[test]
fn test_shutdown_before_first_attempt() {
struct NeverWriter;
impl LogWriter for NeverWriter {
fn write_entry(&mut self, _: u64, _: u8, _: &[u8]) -> Result<()> {
panic!("should not be called");
}
}
let shutdown = Arc::new(AtomicBool::new(true));
let cfg = ReconnectConfig::default();
let addr: SocketAddr = "127.0.0.1:1".parse().unwrap();
let outcome =
catch_up_with_retry(addr, 0, &mut NeverWriter, &cfg, &shutdown);
assert_eq!(outcome, ReconnectOutcome::Shutdown);
}
#[test]
fn test_max_retries_exceeded() {
struct NeverWriter;
impl LogWriter for NeverWriter {
fn write_entry(&mut self, _: u64, _: u8, _: &[u8]) -> Result<()> {
Ok(())
}
}
let shutdown = Arc::new(AtomicBool::new(false));
let cfg = ReconnectConfig {
initial_backoff_ms: 1,
max_backoff_ms: 1,
backoff_factor: 1.0,
max_retries: 2,
jitter_fraction: 0.0,
};
let addr: SocketAddr = "127.0.0.1:1".parse().unwrap();
let outcome =
catch_up_with_retry(addr, 0, &mut NeverWriter, &cfg, &shutdown);
assert_eq!(outcome, ReconnectOutcome::MaxRetriesExceeded);
}
#[test]
fn test_is_retryable_network_error() {
assert!(is_retryable(&RepError::NetworkError("timeout".into())));
assert!(is_retryable(&RepError::ChannelClosed("gone".into())));
assert!(is_retryable(&RepError::FrameCorrupted {
vlsn: 1,
expected: 0,
actual: 1,
}));
assert!(!is_retryable(&RepError::ProtocolError("bad".into())));
assert!(!is_retryable(&RepError::DatabaseError("disk".into())));
}
}