use std::time::Duration;
use qubit_common::BoxError;
use qubit_function::{BiConsumer, BiFunction, BiPredicate, Consumer};
use crate::constants::KEY_MAX_ATTEMPTS;
use crate::event::RetryListeners;
use crate::{
AttemptFailure, AttemptFailureDecision, Retry, RetryAfterHint, RetryConfigError, RetryContext,
RetryDelay, RetryError, RetryJitter, RetryOptions,
};
pub struct RetryBuilder<E = BoxError> {
options: RetryOptions,
attempt_timeout: Option<Duration>,
retry_after_hint: Option<RetryAfterHint<E>>,
listeners: RetryListeners<E>,
isolate_listener_panics: bool,
max_attempts_error: Option<RetryConfigError>,
}
impl<E> RetryBuilder<E> {
#[inline]
pub fn new() -> Self {
Self {
options: RetryOptions::default(),
attempt_timeout: None,
retry_after_hint: None,
listeners: RetryListeners::default(),
isolate_listener_panics: false,
max_attempts_error: None,
}
}
#[inline]
pub fn options(mut self, options: RetryOptions) -> Self {
self.options = options;
self.max_attempts_error = None;
self
}
pub fn max_attempts(mut self, max_attempts: u32) -> Self {
if let Some(max_attempts) = std::num::NonZeroU32::new(max_attempts) {
self.options.max_attempts = max_attempts;
self.max_attempts_error = None;
} else {
self.max_attempts_error = Some(RetryConfigError::invalid_value(
KEY_MAX_ATTEMPTS,
"max_attempts must be greater than zero",
));
}
self
}
#[inline]
pub fn max_retries(self, max_retries: u32) -> Self {
self.max_attempts(max_retries.saturating_add(1))
}
#[inline]
pub fn max_elapsed(mut self, max_elapsed: Option<Duration>) -> Self {
self.options.max_elapsed = max_elapsed;
self
}
#[inline]
pub fn delay(mut self, delay: RetryDelay) -> Self {
self.options.delay = delay;
self
}
#[inline]
pub fn no_delay(self) -> Self {
self.delay(RetryDelay::none())
}
#[inline]
pub fn fixed_delay(self, delay: Duration) -> Self {
self.delay(RetryDelay::fixed(delay))
}
#[inline]
pub fn random_delay(self, min: Duration, max: Duration) -> Self {
self.delay(RetryDelay::random(min, max))
}
#[inline]
pub fn exponential_backoff(self, initial: Duration, max: Duration) -> Self {
self.exponential_backoff_with_multiplier(initial, max, 2.0)
}
#[inline]
pub fn exponential_backoff_with_multiplier(
self,
initial: Duration,
max: Duration,
multiplier: f64,
) -> Self {
self.delay(RetryDelay::exponential(initial, max, multiplier))
}
#[inline]
pub fn jitter(mut self, jitter: RetryJitter) -> Self {
self.options.jitter = jitter;
self
}
#[inline]
pub fn jitter_factor(self, factor: f64) -> Self {
self.jitter(RetryJitter::factor(factor))
}
#[inline]
pub fn attempt_timeout(mut self, attempt_timeout: Option<Duration>) -> Self {
self.attempt_timeout = attempt_timeout;
self
}
pub fn retry_after_hint<H>(mut self, hint: H) -> Self
where
H: BiFunction<AttemptFailure<E>, RetryContext, Option<Duration>> + Send + Sync + 'static,
{
self.retry_after_hint = Some(hint.into_arc());
self
}
pub fn retry_after_from_error<H>(self, hint: H) -> Self
where
H: Fn(&E) -> Option<Duration> + Send + Sync + 'static,
{
self.retry_after_hint(
move |failure: &AttemptFailure<E>, _context: &RetryContext| {
failure.as_error().and_then(&hint)
},
)
}
pub fn before_attempt<C>(mut self, listener: C) -> Self
where
C: Consumer<RetryContext> + Send + Sync + 'static,
{
self.listeners.before_attempt.push(listener.into_arc());
self
}
pub fn on_success<C>(mut self, listener: C) -> Self
where
C: Consumer<RetryContext> + Send + Sync + 'static,
{
self.listeners.attempt_success.push(listener.into_arc());
self
}
pub fn on_failure<F>(mut self, listener: F) -> Self
where
F: BiFunction<AttemptFailure<E>, RetryContext, AttemptFailureDecision>
+ Send
+ Sync
+ 'static,
{
self.listeners.failure.push(listener.into_arc());
self
}
pub fn retry_if_error<P>(self, predicate: P) -> Self
where
P: BiPredicate<E, RetryContext> + Send + Sync + 'static,
{
self.on_failure(
move |failure: &AttemptFailure<E>, context: &RetryContext| match failure {
AttemptFailure::Error(error) => {
if predicate.test(error, context) {
AttemptFailureDecision::Retry
} else {
AttemptFailureDecision::Abort
}
}
AttemptFailure::Timeout => AttemptFailureDecision::UseDefault,
},
)
}
pub fn on_error<C>(mut self, listener: C) -> Self
where
C: BiConsumer<RetryError<E>, RetryContext> + Send + Sync + 'static,
{
self.listeners.error.push(listener.into_arc());
self
}
pub fn abort_on_timeout(self) -> Self {
self.on_failure(
|failure: &AttemptFailure<E>, _context: &RetryContext| match failure {
AttemptFailure::Timeout => AttemptFailureDecision::Abort,
AttemptFailure::Error(_) => AttemptFailureDecision::UseDefault,
},
)
}
pub fn retry_on_timeout(self) -> Self {
self.on_failure(
|failure: &AttemptFailure<E>, _context: &RetryContext| match failure {
AttemptFailure::Timeout => AttemptFailureDecision::Retry,
AttemptFailure::Error(_) => AttemptFailureDecision::UseDefault,
},
)
}
#[inline]
pub fn isolate_listener_panics(mut self) -> Self {
self.isolate_listener_panics = true;
self
}
pub fn build(self) -> Result<Retry<E>, RetryConfigError> {
if let Some(error) = self.max_attempts_error {
return Err(error);
}
self.options.validate()?;
Ok(Retry::new(
self.options,
self.attempt_timeout,
self.retry_after_hint,
self.isolate_listener_panics,
self.listeners,
))
}
}
impl<E> Default for RetryBuilder<E> {
#[inline]
fn default() -> Self {
Self::new()
}
}