#![cfg_attr(
not(all(scylla_unstable, feature = "unstable-reconnect-policy")),
expect(unreachable_pub)
)]
use std::ops::RangeInclusive;
use std::time::Duration;
use rand::Rng;
pub trait ReconnectPolicySession: Send + std::fmt::Debug {
fn get_delay(&self) -> Duration;
fn on_successful_fill(&mut self);
fn on_fill_error(&mut self);
}
pub trait ReconnectPolicy: Send + Sync + std::fmt::Debug {
fn new_session(&self) -> Box<dyn ReconnectPolicySession>;
}
#[derive(Debug)]
struct HostExponentialReconnectPolicy {
min_fill_backoff: Duration,
max_fill_backoff: Duration,
jitter_range: RangeInclusive<f64>,
current_delay: Duration,
}
impl ReconnectPolicySession for HostExponentialReconnectPolicy {
fn get_delay(&self) -> Duration {
let jitter_multiplier = rand::rng().random_range(self.jitter_range.clone());
let jittered_delay = self.current_delay.mul_f64(jitter_multiplier);
jittered_delay.clamp(self.min_fill_backoff, self.max_fill_backoff)
}
fn on_successful_fill(&mut self) {
self.current_delay = self.min_fill_backoff;
}
fn on_fill_error(&mut self) {
self.current_delay =
std::cmp::min(self.max_fill_backoff, self.current_delay.saturating_mul(2));
}
}
#[derive(Debug)]
pub struct ExponentialReconnectPolicy {
min_fill_backoff: Duration,
max_fill_backoff: Duration,
jitter_range: RangeInclusive<f64>,
}
#[cfg_attr(
not(all(scylla_unstable, feature = "unstable-reconnect-policy")),
expect(dead_code)
)]
impl ExponentialReconnectPolicy {
pub fn new() -> Self {
let min_fill_backoff = Duration::from_millis(50);
Self {
min_fill_backoff,
max_fill_backoff: Duration::from_secs(10),
jitter_range: 0.85..=1.15,
}
}
pub fn with_backoff_limits(self, min: Duration, max: Duration) -> Self {
assert!(
min <= max,
"min_fill_backoff ({:?}) must be less than or equal to max_fill_backoff ({:?})",
min,
max
);
Self {
min_fill_backoff: min,
max_fill_backoff: max,
..self
}
}
pub fn with_jitter_range(mut self, jitter_range: RangeInclusive<f64>) -> Self {
assert!(!jitter_range.is_empty(), "Jitter range must not be empty");
assert!(
*jitter_range.start() >= 0.0,
"Jitter range start must be non-negative"
);
self.jitter_range = jitter_range;
self
}
}
impl Default for ExponentialReconnectPolicy {
fn default() -> Self {
Self::new()
}
}
impl ReconnectPolicy for ExponentialReconnectPolicy {
fn new_session(&self) -> Box<dyn ReconnectPolicySession> {
Box::new(HostExponentialReconnectPolicy {
min_fill_backoff: self.min_fill_backoff,
max_fill_backoff: self.max_fill_backoff,
current_delay: self.min_fill_backoff,
jitter_range: self.jitter_range.clone(),
})
}
}
#[derive(Debug, Clone)]
pub struct ConstantReconnectPolicy {
delay: Duration,
jitter_range: RangeInclusive<f64>,
}
#[cfg_attr(
not(all(scylla_unstable, feature = "unstable-reconnect-policy")),
expect(dead_code)
)]
impl ConstantReconnectPolicy {
pub fn new(duration: Duration) -> Self {
Self {
delay: duration,
jitter_range: 0.85..=1.15,
}
}
pub fn with_jitter_range(mut self, jitter_range: RangeInclusive<f64>) -> Self {
assert!(!jitter_range.is_empty(), "Jitter range must not be empty");
assert!(
*jitter_range.start() >= 0.0,
"Jitter range start must be non-negative"
);
self.jitter_range = jitter_range;
self
}
}
impl ReconnectPolicy for ConstantReconnectPolicy {
fn new_session(&self) -> Box<dyn ReconnectPolicySession> {
Box::new(self.clone())
}
}
impl ReconnectPolicySession for ConstantReconnectPolicy {
fn get_delay(&self) -> Duration {
let jitter_multiplier = rand::rng().random_range(self.jitter_range.clone());
self.delay.mul_f64(jitter_multiplier)
}
fn on_successful_fill(&mut self) {}
fn on_fill_error(&mut self) {}
}