scylla 1.6.0

Async CQL driver for Rust, optimized for ScyllaDB, fully compatible with Apache Cassandraâ„¢
Documentation
#![cfg_attr(
    not(all(scylla_unstable, feature = "unstable-reconnect-policy")),
    expect(unreachable_pub)
)]

//! Policy that controls delays between connection pool fill attempts.
//!
//! UNSTABLE API, guarded by `scylla_unstable` cfg and `unstable-reconnect-policy` feature.

use std::ops::RangeInclusive;
use std::time::Duration;

use rand::Rng;

/// Generated by [ReconnectPolicy] to control the delay between connection
/// attempts in a single connection pool.
///
/// In the current implementation, pool performs fills in batches.
/// [`on_fill_error()`](Self::on_fill_error) is called if any connection attempt in the batch failed.
/// [`on_successful_fill()`](Self::on_successful_fill) is called if all connection attempts in the batch succeeded.
pub trait ReconnectPolicySession: Send + std::fmt::Debug {
    /// Tells the pool how long to wait before attempting to fill connections to the host.
    fn get_delay(&self) -> Duration;
    /// Called when all connection attempts in the batch succeeded.
    fn on_successful_fill(&mut self);
    /// Called when any connection attempt in the batch failed.
    fn on_fill_error(&mut self);
}

/// Policy for controlling delays between connection pool fill attempts.
///
/// Because such policy may require mutable state (to handle fill failures),
/// this trait is not used directly by connection pools. Instead, the only
/// purpose of this trait is to generate boxed instances of per-host policy objects.
/// The generated objects are used by a single pool, and so can have methods with `&mut self`.
pub trait ReconnectPolicy: Send + Sync + std::fmt::Debug {
    /// Returns a new instance of the policy for the single host.
    fn new_session(&self) -> Box<dyn ReconnectPolicySession>;
}

#[derive(Debug)]
struct HostExponentialReconnectPolicy {
    min_fill_backoff: Duration,
    max_fill_backoff: Duration,
    jitter_range: RangeInclusive<f64>,
    current_delay: Duration,
}

impl ReconnectPolicySession for HostExponentialReconnectPolicy {
    fn get_delay(&self) -> Duration {
        // The clone below is cheap. Ranges intentionally don't implement Copy, but for other reasons.
        // See: https://github.com/rust-lang/rust/pull/27186
        let jitter_multiplier = rand::rng().random_range(self.jitter_range.clone());
        let jittered_delay = self.current_delay.mul_f64(jitter_multiplier);
        jittered_delay.clamp(self.min_fill_backoff, self.max_fill_backoff)
    }

    fn on_successful_fill(&mut self) {
        self.current_delay = self.min_fill_backoff;
    }

    fn on_fill_error(&mut self) {
        self.current_delay =
            std::cmp::min(self.max_fill_backoff, self.current_delay.saturating_mul(2));
    }
}

/// Reconnect policy that exponentially increases the delay between connection pool fill attempts.
///
/// Minimum and maximum delays can be configured.
///
/// A random amount of jitter (from configurable range) will be applied to each delay.
/// Delay after applying jitter will be clamped to the configured minimum and maximum delays.
/// This helps with the situation where multiple clients are trying to reconnect simultaneously.
///
/// When creating the policy, make sure that the effective delay value will be reasonable
/// after multiplying by any value from the jitter range (by default: 0.85..=1.15).
/// If, for example, jitter range is close to 0 (let's say, 0.01..=1.0), then effective delay
/// may get close to 0 (when value close to 0.01 is chosen).
#[derive(Debug)]
pub struct ExponentialReconnectPolicy {
    min_fill_backoff: Duration,
    max_fill_backoff: Duration,
    jitter_range: RangeInclusive<f64>,
}

#[cfg_attr(
    not(all(scylla_unstable, feature = "unstable-reconnect-policy")),
    expect(dead_code)
)]
impl ExponentialReconnectPolicy {
    /// Creates a new exponential reconnect policy with default values.
    pub fn new() -> Self {
        let min_fill_backoff = Duration::from_millis(50);
        Self {
            min_fill_backoff,
            max_fill_backoff: Duration::from_secs(10),
            jitter_range: 0.85..=1.15,
        }
    }

    /// Configures the minimum and maximum backoff durations for exponential backoff.
    ///
    /// The delay between fill attempts will be exponentially increased starting from `min` up to `max`.
    pub fn with_backoff_limits(self, min: Duration, max: Duration) -> Self {
        assert!(
            min <= max,
            "min_fill_backoff ({:?}) must be less than or equal to max_fill_backoff ({:?})",
            min,
            max
        );
        Self {
            min_fill_backoff: min,
            max_fill_backoff: max,
            ..self
        }
    }

    /// Sets the jitter range for the delay between fill attempts.
    ///
    /// On each call to `get_delay`, value will be sampled from the provided range.
    /// The delay will be multiplied by this value (and then clamped between `min_fill_backoff` and `max_fill_backoff`).
    /// You can use 1.0..=1.0 to effectively disable jitter.
    ///
    /// Default value is 0.85..=1.15.
    pub fn with_jitter_range(mut self, jitter_range: RangeInclusive<f64>) -> Self {
        assert!(!jitter_range.is_empty(), "Jitter range must not be empty");
        assert!(
            *jitter_range.start() >= 0.0,
            "Jitter range start must be non-negative"
        );
        self.jitter_range = jitter_range;
        self
    }
}

impl Default for ExponentialReconnectPolicy {
    fn default() -> Self {
        Self::new()
    }
}

impl ReconnectPolicy for ExponentialReconnectPolicy {
    fn new_session(&self) -> Box<dyn ReconnectPolicySession> {
        Box::new(HostExponentialReconnectPolicy {
            min_fill_backoff: self.min_fill_backoff,
            max_fill_backoff: self.max_fill_backoff,
            current_delay: self.min_fill_backoff,
            jitter_range: self.jitter_range.clone(),
        })
    }
}

/// Constant reconnect policy.
/// This policy always returns constant delay with randomized jitter applied to it.
/// This helps with the situation where multiple clients are trying to reconnect simultaneously.
///
/// When creating the policy, make sure that the effective delay value will be reasonable
/// after multiplying by any value from the jitter range (by default: 0.85..=1.15).
/// If, for example, jitter range is close to 0 (let's say, 0.01..=1.0), then effective delay
/// may get close to 0 (when value close to 0.01 is chosen).
#[derive(Debug, Clone)]
pub struct ConstantReconnectPolicy {
    delay: Duration,
    jitter_range: RangeInclusive<f64>,
}

#[cfg_attr(
    not(all(scylla_unstable, feature = "unstable-reconnect-policy")),
    expect(dead_code)
)]
impl ConstantReconnectPolicy {
    /// Creates a new constant reconnect policy with the given delay.
    pub fn new(duration: Duration) -> Self {
        Self {
            delay: duration,
            jitter_range: 0.85..=1.15,
        }
    }

    /// Sets the jitter range for the delay between fill attempts.
    ///
    /// On each call to `get_delay`, value will be sampled from the provided range.
    /// The delay will be multiplied by this value.
    /// You can use 1.0..=1.0 to effectively disable jitter.
    ///
    /// Default value is 0.85..=1.15.
    pub fn with_jitter_range(mut self, jitter_range: RangeInclusive<f64>) -> Self {
        assert!(!jitter_range.is_empty(), "Jitter range must not be empty");
        assert!(
            *jitter_range.start() >= 0.0,
            "Jitter range start must be non-negative"
        );
        self.jitter_range = jitter_range;
        self
    }
}

impl ReconnectPolicy for ConstantReconnectPolicy {
    fn new_session(&self) -> Box<dyn ReconnectPolicySession> {
        Box::new(self.clone())
    }
}

// ConstantReconnectPolicy doesn't have any mutable state,
// so we can avoid having another struct for per-host policy.
impl ReconnectPolicySession for ConstantReconnectPolicy {
    fn get_delay(&self) -> Duration {
        // The clone below is cheap. Ranges intentionally don't implement Copy, but for other reasons.
        // See: https://github.com/rust-lang/rust/pull/27186
        let jitter_multiplier = rand::rng().random_range(self.jitter_range.clone());
        self.delay.mul_f64(jitter_multiplier)
    }

    fn on_successful_fill(&mut self) {}

    fn on_fill_error(&mut self) {}
}