use std::time::Duration;
use qubit_error::BoxError;
use qubit_function::{BiConsumer, BiFunction, BiPredicate, Consumer};
use crate::constants::KEY_MAX_ATTEMPTS;
use crate::event::RetryListeners;
use crate::{
AttemptFailure, AttemptFailureDecision, AttemptTimeoutOption, AttemptTimeoutPolicy, Retry,
RetryAfterHint, RetryConfigError, RetryContext, RetryDelay, RetryError, RetryJitter,
RetryOptions,
};
pub struct RetryBuilder<E = BoxError> {
options: RetryOptions,
pending_attempt_timeout_policy: AttemptTimeoutPolicy,
retry_after_hint: Option<RetryAfterHint<E>>,
listeners: RetryListeners<E>,
isolate_listener_panics: bool,
max_attempts_error: Option<RetryConfigError>,
}
impl<E> RetryBuilder<E> {
#[inline]
pub fn new() -> Self {
Self {
options: RetryOptions::default(),
pending_attempt_timeout_policy: AttemptTimeoutPolicy::default(),
retry_after_hint: None,
listeners: RetryListeners::default(),
isolate_listener_panics: false,
max_attempts_error: None,
}
}
#[inline]
pub fn options(mut self, options: RetryOptions) -> Self {
self.pending_attempt_timeout_policy = options
.attempt_timeout()
.map(|attempt_timeout| attempt_timeout.policy())
.unwrap_or_default();
self.options = options;
self.max_attempts_error = None;
self
}
pub fn max_attempts(mut self, max_attempts: u32) -> Self {
if let Some(max_attempts) = std::num::NonZeroU32::new(max_attempts) {
self.options.max_attempts = max_attempts;
self.max_attempts_error = None;
} else {
self.max_attempts_error = Some(RetryConfigError::invalid_value(
KEY_MAX_ATTEMPTS,
"max_attempts must be greater than zero",
));
}
self
}
#[inline]
pub fn max_retries(self, max_retries: u32) -> Self {
self.max_attempts(max_retries.saturating_add(1))
}
#[inline]
pub fn max_operation_elapsed(mut self, max_operation_elapsed: Option<Duration>) -> Self {
self.options.max_operation_elapsed = max_operation_elapsed;
self
}
#[inline]
pub fn max_total_elapsed(mut self, max_total_elapsed: Option<Duration>) -> Self {
self.options.max_total_elapsed = max_total_elapsed;
self
}
#[inline]
pub fn delay(mut self, delay: RetryDelay) -> Self {
self.options.delay = delay;
self
}
#[inline]
pub fn no_delay(self) -> Self {
self.delay(RetryDelay::none())
}
#[inline]
pub fn fixed_delay(self, delay: Duration) -> Self {
self.delay(RetryDelay::fixed(delay))
}
#[inline]
pub fn random_delay(self, min: Duration, max: Duration) -> Self {
self.delay(RetryDelay::random(min, max))
}
#[inline]
pub fn exponential_backoff(self, initial: Duration, max: Duration) -> Self {
self.exponential_backoff_with_multiplier(initial, max, 2.0)
}
#[inline]
pub fn exponential_backoff_with_multiplier(
self,
initial: Duration,
max: Duration,
multiplier: f64,
) -> Self {
self.delay(RetryDelay::exponential(initial, max, multiplier))
}
#[inline]
pub fn jitter(mut self, jitter: RetryJitter) -> Self {
self.options.jitter = jitter;
self
}
#[inline]
pub fn jitter_factor(self, factor: f64) -> Self {
self.jitter(RetryJitter::factor(factor))
}
#[inline]
pub fn attempt_timeout(mut self, attempt_timeout: Option<Duration>) -> Self {
if let Some(timeout) = attempt_timeout {
self.options.attempt_timeout = Some(AttemptTimeoutOption::new(
timeout,
self.pending_attempt_timeout_policy,
));
} else {
self.pending_attempt_timeout_policy = AttemptTimeoutPolicy::default();
self.options.attempt_timeout = None;
}
self
}
#[inline]
pub fn attempt_timeout_option(mut self, attempt_timeout: Option<AttemptTimeoutOption>) -> Self {
if let Some(attempt_timeout) = attempt_timeout {
self.pending_attempt_timeout_policy = attempt_timeout.policy();
} else {
self.pending_attempt_timeout_policy = AttemptTimeoutPolicy::default();
}
self.options.attempt_timeout = attempt_timeout;
self
}
#[inline]
pub fn attempt_timeout_policy(mut self, policy: AttemptTimeoutPolicy) -> Self {
self.pending_attempt_timeout_policy = policy;
self.options.attempt_timeout = self
.options
.attempt_timeout
.map(|attempt_timeout| attempt_timeout.with_policy(policy));
self
}
#[inline]
pub fn worker_cancel_grace(mut self, grace: Duration) -> Self {
self.options.worker_cancel_grace = grace;
self
}
pub fn retry_after_hint<H>(mut self, hint: H) -> Self
where
H: BiFunction<AttemptFailure<E>, RetryContext, Option<Duration>> + Send + Sync + 'static,
{
self.retry_after_hint = Some(hint.into_arc());
self
}
pub fn retry_after_from_error<H>(self, hint: H) -> Self
where
H: Fn(&E) -> Option<Duration> + Send + Sync + 'static,
{
self.retry_after_hint(
move |failure: &AttemptFailure<E>, _context: &RetryContext| {
failure.as_error().and_then(&hint)
},
)
}
pub fn before_attempt<C>(mut self, listener: C) -> Self
where
C: Consumer<RetryContext> + Send + Sync + 'static,
{
self.listeners.before_attempt.push(listener.into_arc());
self
}
pub fn on_success<C>(mut self, listener: C) -> Self
where
C: Consumer<RetryContext> + Send + Sync + 'static,
{
self.listeners.attempt_success.push(listener.into_arc());
self
}
pub fn on_failure<F>(mut self, listener: F) -> Self
where
F: BiFunction<AttemptFailure<E>, RetryContext, AttemptFailureDecision>
+ Send
+ Sync
+ 'static,
{
self.listeners.failure.push(listener.into_arc());
self
}
pub fn on_retry<C>(mut self, listener: C) -> Self
where
C: BiConsumer<AttemptFailure<E>, RetryContext> + Send + Sync + 'static,
{
self.listeners.retry_scheduled.push(listener.into_arc());
self
}
pub fn retry_if_error<P>(self, predicate: P) -> Self
where
P: BiPredicate<E, RetryContext> + Send + Sync + 'static,
{
self.on_failure(
move |failure: &AttemptFailure<E>, context: &RetryContext| match failure {
AttemptFailure::Error(error) => {
if predicate.test(error, context) {
AttemptFailureDecision::Retry
} else {
AttemptFailureDecision::Abort
}
}
AttemptFailure::Timeout
| AttemptFailure::Panic(_)
| AttemptFailure::Executor(_) => AttemptFailureDecision::UseDefault,
},
)
}
pub fn on_error<C>(mut self, listener: C) -> Self
where
C: BiConsumer<RetryError<E>, RetryContext> + Send + Sync + 'static,
{
self.listeners.error.push(listener.into_arc());
self
}
pub fn abort_on_timeout(self) -> Self {
self.attempt_timeout_policy(AttemptTimeoutPolicy::Abort)
}
pub fn retry_on_timeout(self) -> Self {
self.attempt_timeout_policy(AttemptTimeoutPolicy::Retry)
}
#[inline]
pub fn isolate_listener_panics(mut self) -> Self {
self.isolate_listener_panics = true;
self
}
pub fn build(self) -> Result<Retry<E>, RetryConfigError> {
if let Some(error) = self.max_attempts_error {
return Err(error);
}
self.options.validate()?;
Ok(Retry::new(
self.options,
self.retry_after_hint,
self.isolate_listener_panics,
self.listeners,
))
}
}
impl<E> Default for RetryBuilder<E> {
#[inline]
fn default() -> Self {
Self::new()
}
}