use std::marker::PhantomData;
use std::time::Duration;
use qubit_common::BoxError;
use qubit_retry::{RetryConfigError, RetryDelay, RetryJitter, RetryOptions};
use crate::observability::{CasObservabilityConfig, ContentionThresholds, ListenerPanicPolicy};
use crate::options::CasTimeoutPolicy;
use crate::strategy::CasStrategy;
use super::cas_executor::CasExecutor;
pub struct CasBuilder<T, E = BoxError> {
max_attempts: u32,
max_operation_elapsed: Option<Duration>,
max_total_elapsed: Option<Duration>,
delay: RetryDelay,
jitter: RetryJitter,
attempt_timeout: Option<Duration>,
timeout_policy: CasTimeoutPolicy,
observability: CasObservabilityConfig,
max_attempts_error: Option<RetryConfigError>,
marker: PhantomData<fn() -> (T, E)>,
}
impl<T, E> CasBuilder<T, E> {
pub fn new() -> Self {
let options = RetryOptions::default();
Self {
max_attempts: options.max_attempts(),
max_operation_elapsed: options.max_operation_elapsed(),
max_total_elapsed: options.max_total_elapsed(),
delay: options.delay().clone(),
jitter: options.jitter(),
attempt_timeout: None,
timeout_policy: CasTimeoutPolicy::Retry,
observability: CasObservabilityConfig::default(),
max_attempts_error: None,
marker: PhantomData,
}
}
pub fn options(mut self, options: RetryOptions) -> Self {
self.max_attempts = options.max_attempts();
self.max_operation_elapsed = options.max_operation_elapsed();
self.max_total_elapsed = options.max_total_elapsed();
self.delay = options.delay().clone();
self.jitter = options.jitter();
self.max_attempts_error = None;
self
}
pub fn max_attempts(mut self, max_attempts: u32) -> Self {
if max_attempts == 0 {
self.max_attempts_error = Some(RetryConfigError::invalid_value(
qubit_retry::constants::KEY_MAX_ATTEMPTS,
"max_attempts must be greater than zero",
));
} else {
self.max_attempts = max_attempts;
self.max_attempts_error = None;
}
self
}
#[inline]
pub fn max_retries(self, max_retries: u32) -> Self {
self.max_attempts(max_retries.saturating_add(1))
}
#[inline]
pub fn max_operation_elapsed(mut self, max_operation_elapsed: Option<Duration>) -> Self {
self.max_operation_elapsed = max_operation_elapsed;
self
}
#[inline]
pub fn max_total_elapsed(mut self, max_total_elapsed: Option<Duration>) -> Self {
self.max_total_elapsed = max_total_elapsed;
self
}
#[inline]
pub fn delay(mut self, delay: RetryDelay) -> Self {
self.delay = delay;
self
}
#[inline]
pub fn no_delay(self) -> Self {
self.delay(RetryDelay::none())
}
#[inline]
pub fn fixed_delay(self, delay: Duration) -> Self {
self.delay(RetryDelay::fixed(delay))
}
#[inline]
pub fn random_delay(self, min: Duration, max: Duration) -> Self {
self.delay(RetryDelay::random(min, max))
}
#[inline]
pub fn exponential_backoff(self, initial: Duration, max: Duration) -> Self {
self.exponential_backoff_with_multiplier(initial, max, 2.0)
}
#[inline]
pub fn exponential_backoff_with_multiplier(
self,
initial: Duration,
max: Duration,
multiplier: f64,
) -> Self {
self.delay(RetryDelay::exponential(initial, max, multiplier))
}
#[inline]
pub fn jitter(mut self, jitter: RetryJitter) -> Self {
self.jitter = jitter;
self
}
#[inline]
pub fn jitter_factor(self, factor: f64) -> Self {
self.jitter(RetryJitter::factor(factor))
}
#[inline]
pub fn attempt_timeout(mut self, attempt_timeout: Option<Duration>) -> Self {
self.attempt_timeout = attempt_timeout;
self
}
#[inline]
pub fn retry_on_timeout(mut self) -> Self {
self.timeout_policy = CasTimeoutPolicy::Retry;
self
}
#[inline]
pub fn abort_on_timeout(mut self) -> Self {
self.timeout_policy = CasTimeoutPolicy::Abort;
self
}
pub fn strategy(self, strategy: CasStrategy) -> Self {
let profile = strategy.profile();
let builder = self
.max_attempts(profile.max_attempts())
.max_operation_elapsed(Some(profile.max_operation_elapsed()))
.max_total_elapsed(profile.max_total_elapsed());
if let Some((initial, max, jitter)) = strategy.backoff() {
builder
.exponential_backoff(initial, max)
.jitter_factor(jitter)
} else {
builder.no_delay()
}
}
#[inline]
pub fn observability(mut self, observability: CasObservabilityConfig) -> Self {
self.observability = observability;
self
}
#[inline]
pub fn alert_on_contention(mut self, thresholds: ContentionThresholds) -> Self {
self.observability = self.observability.with_contention_thresholds(thresholds);
self
}
#[inline]
pub fn isolate_listener_panics(mut self) -> Self {
self.observability = self
.observability
.with_listener_panic_policy(ListenerPanicPolicy::Isolate);
self
}
pub fn build(self) -> Result<CasExecutor<T, E>, RetryConfigError> {
if let Some(error) = self.max_attempts_error {
return Err(error);
}
let options = RetryOptions::new(
self.max_attempts,
self.max_operation_elapsed,
self.max_total_elapsed,
self.delay,
self.jitter,
)?;
Ok(CasExecutor::new(
options,
self.attempt_timeout,
self.timeout_policy,
self.observability,
))
}
pub fn build_contention_adaptive(self) -> Result<CasExecutor<T, E>, RetryConfigError> {
self.strategy(CasStrategy::ContentionAdaptive).build()
}
pub fn build_latency_first(self) -> Result<CasExecutor<T, E>, RetryConfigError> {
self.strategy(CasStrategy::LatencyFirst).build()
}
pub fn build_reliability_first(self) -> Result<CasExecutor<T, E>, RetryConfigError> {
self.strategy(CasStrategy::ReliabilityFirst).build()
}
}
impl<T, E> Default for CasBuilder<T, E> {
#[inline]
fn default() -> Self {
Self::new()
}
}