use std::any::TypeId;
use std::collections::HashSet;
use std::error::Error;
use std::time::Duration;
use qubit_function::Consumer;
use qubit_function::{BoxPredicate, Predicate};
use super::event::{
AbortEventListener, FailureEventListener, RetryEventListener, SuccessEventListener,
};
use super::{
AbortEvent, AbortReason, DefaultRetryConfig, FailureEvent, RetryConfig, RetryDecision,
RetryDelayStrategy, RetryEvent, RetryReason, SuccessEvent,
};
type ConditionPredicate<T> = Option<BoxPredicate<T>>;
pub struct RetryBuilder<T, C: RetryConfig = DefaultRetryConfig> {
config: C,
failed_error_types: HashSet<TypeId>,
failed_results: HashSet<T>,
failed_condition: ConditionPredicate<T>,
abort_error_types: HashSet<TypeId>,
abort_results: HashSet<T>,
abort_condition: ConditionPredicate<T>,
on_retry: Option<RetryEventListener<T>>,
on_success: Option<SuccessEventListener<T>>,
on_failure: Option<FailureEventListener<T>>,
on_abort: Option<AbortEventListener<T>>,
}
impl<T> RetryBuilder<T, DefaultRetryConfig>
where
T: Clone + PartialEq + Eq + std::hash::Hash + Send + Sync + 'static,
{
pub fn new() -> Self {
Self {
config: DefaultRetryConfig::new(),
failed_error_types: HashSet::new(),
failed_results: HashSet::new(),
failed_condition: None,
abort_error_types: HashSet::new(),
abort_results: HashSet::new(),
abort_condition: None,
on_retry: None,
on_success: None,
on_failure: None,
on_abort: None,
}
}
}
impl<T, C> RetryBuilder<T, C>
where
T: Clone + PartialEq + Eq + std::hash::Hash + Send + Sync + 'static,
C: RetryConfig,
{
pub fn with_config(config: C) -> Self {
Self {
config,
failed_error_types: HashSet::new(),
failed_results: HashSet::new(),
failed_condition: None,
abort_error_types: HashSet::new(),
abort_results: HashSet::new(),
abort_condition: None,
on_retry: None,
on_success: None,
on_failure: None,
on_abort: None,
}
}
pub fn max_attempts(&self) -> u32 {
self.config.max_attempts()
}
pub fn set_max_attempts(mut self, max_attempts: u32) -> Self {
self.config.set_max_attempts(max_attempts);
self
}
pub fn max_duration(&self) -> Option<Duration> {
self.config.max_duration()
}
pub fn set_max_duration(mut self, max_duration: Option<Duration>) -> Self {
self.config.set_max_duration(max_duration);
self
}
pub fn operation_timeout(&self) -> Option<Duration> {
self.config.operation_timeout()
}
pub fn set_operation_timeout(mut self, timeout: Option<Duration>) -> Self {
self.config.set_operation_timeout(timeout);
self
}
pub fn set_unlimited_operation_timeout(mut self) -> Self {
self.config.set_unlimited_operation_timeout();
self
}
pub fn delay_strategy(&self) -> RetryDelayStrategy {
self.config.delay_strategy()
}
pub fn set_delay_strategy(mut self, delay_strategy: RetryDelayStrategy) -> Self {
self.config.set_delay_strategy(delay_strategy);
self
}
pub fn jitter_factor(&self) -> f64 {
self.config.jitter_factor()
}
pub fn set_jitter_factor(mut self, jitter_factor: f64) -> Self {
self.config.set_jitter_factor(jitter_factor);
self
}
pub fn set_random_delay_strategy(mut self, min_delay: Duration, max_delay: Duration) -> Self {
self.config.set_random_delay_strategy(min_delay, max_delay);
self
}
pub fn set_fixed_delay_strategy(mut self, delay: Duration) -> Self {
self.config.set_fixed_delay_strategy(delay);
self
}
pub fn set_exponential_backoff_strategy(
mut self,
initial_delay: Duration,
max_delay: Duration,
multiplier: f64,
) -> Self {
self.config
.set_exponential_backoff_strategy(initial_delay, max_delay, multiplier);
self
}
pub fn set_no_delay_strategy(mut self) -> Self {
self.config.set_no_delay_strategy();
self
}
pub fn set_unlimited_duration(mut self) -> Self {
self.config.set_unlimited_duration();
self
}
pub fn no_failed_errors(mut self) -> Self {
self.failed_error_types.clear();
self.failed_error_types
.insert(TypeId::of::<NonExistentError>());
self
}
pub fn failed_on_all_errors(mut self) -> Self {
self.failed_error_types.clear();
self.failed_error_types.insert(TypeId::of::<dyn Error>());
self
}
pub fn failed_on_error<E: Error + 'static>(mut self) -> Self {
self.failed_error_types.clear();
self.failed_error_types.insert(TypeId::of::<E>());
self
}
pub fn failed_on_errors<E1: Error + 'static, E2: Error + 'static>(mut self) -> Self {
self.failed_error_types.clear();
self.failed_error_types.insert(TypeId::of::<E1>());
self.failed_error_types.insert(TypeId::of::<E2>());
self
}
pub fn failed_on_result(mut self, result: T) -> Self {
self.failed_results.clear();
self.failed_results.insert(result);
self
}
pub fn failed_on_results(mut self, results: Vec<T>) -> Self {
self.failed_results.clear();
self.failed_results.extend(results);
self
}
pub fn failed_on_results_if<P>(mut self, condition: P) -> Self
where
P: Fn(&T) -> bool + 'static,
{
self.failed_condition = Some(condition.into_box());
self
}
pub fn clear_failed_results(mut self) -> Self {
self.failed_results.clear();
self
}
pub fn abort_on_error<E: Error + 'static>(mut self) -> Self {
self.abort_error_types.clear();
self.abort_error_types.insert(TypeId::of::<E>());
self
}
pub fn abort_on_errors<E1: Error + 'static, E2: Error + 'static>(mut self) -> Self {
self.abort_error_types.clear();
self.abort_error_types.insert(TypeId::of::<E1>());
self.abort_error_types.insert(TypeId::of::<E2>());
self
}
pub fn abort_on_all_errors(mut self) -> Self {
self.abort_error_types.clear();
self.abort_error_types.insert(TypeId::of::<dyn Error>());
self
}
pub fn abort_on_result(mut self, result: T) -> Self {
self.abort_results.clear();
self.abort_results.insert(result);
self
}
pub fn abort_on_results(mut self, results: Vec<T>) -> Self {
self.abort_results.clear();
self.abort_results.extend(results);
self
}
pub fn abort_on_results_if<P>(mut self, condition: P) -> Self
where
P: Fn(&T) -> bool + 'static,
{
self.abort_condition = Some(condition.into_box());
self
}
pub fn clear_abort_results(mut self) -> Self {
self.abort_results.clear();
self
}
pub fn on_retry<L>(mut self, listener: L) -> Self
where
L: Consumer<RetryEvent<T>> + 'static,
{
self.on_retry = Some(listener.into_box());
self
}
pub fn on_success<L>(mut self, listener: L) -> Self
where
L: Consumer<SuccessEvent<T>> + 'static,
{
self.on_success = Some(listener.into_box());
self
}
pub fn on_failure<L>(mut self, listener: L) -> Self
where
L: Consumer<FailureEvent<T>> + 'static,
{
self.on_failure = Some(listener.into_box());
self
}
pub fn on_abort<L>(mut self, listener: L) -> Self
where
L: Consumer<AbortEvent<T>> + 'static,
{
self.on_abort = Some(listener.into_box());
self
}
pub fn build(self) -> super::executor::RetryExecutor<T, C> {
super::executor::RetryExecutor::new(self)
}
pub(crate) fn should_retry_error(&self, _error: &(dyn Error + 'static)) -> bool {
if !self.failed_error_types.is_empty() {
if self.failed_error_types.contains(&TypeId::of::<dyn Error>()) {
return true;
}
if self
.failed_error_types
.contains(&TypeId::of::<NonExistentError>())
{
return false;
}
return true;
}
true
}
pub(crate) fn should_abort_error(&self, _error: &(dyn Error + 'static)) -> bool {
if !self.abort_error_types.is_empty() {
if self.abort_error_types.contains(&TypeId::of::<dyn Error>()) {
return true;
}
return true;
}
false
}
pub(crate) fn should_retry_result(&self, result: &T) -> bool {
if self.failed_results.contains(result) {
return true;
}
if let Some(ref condition) = self.failed_condition {
if condition.test(result) {
return true;
}
}
false
}
pub(crate) fn should_abort_result(&self, result: &T) -> bool {
if self.abort_results.contains(result) {
return true;
}
if let Some(ref condition) = self.abort_condition {
if condition.test(result) {
return true;
}
}
false
}
pub(crate) fn get_retry_decision(
&self,
result: Result<T, Box<dyn Error + Send + Sync>>,
) -> RetryDecision<T> {
match result {
Ok(value) => {
if self.should_retry_result(&value) {
RetryDecision::Retry(RetryReason::Result(value))
} else if self.should_abort_result(&value) {
RetryDecision::Abort(AbortReason::Result(value))
} else {
RetryDecision::Success(value)
}
}
Err(error) => {
if self.should_abort_error(error.as_ref()) {
RetryDecision::Abort(AbortReason::Error(error))
} else if self.should_retry_error(error.as_ref()) {
RetryDecision::Retry(RetryReason::Error(error))
} else {
RetryDecision::Abort(AbortReason::Error(error))
}
}
}
}
pub(crate) fn retry_listener(&self) -> &Option<RetryEventListener<T>> {
&self.on_retry
}
pub(crate) fn success_listener(&self) -> &Option<SuccessEventListener<T>> {
&self.on_success
}
pub(crate) fn failure_listener(&self) -> &Option<FailureEventListener<T>> {
&self.on_failure
}
pub(crate) fn abort_listener(&self) -> &Option<AbortEventListener<T>> {
&self.on_abort
}
}
impl<T> Default for RetryBuilder<T, DefaultRetryConfig>
where
T: Clone + PartialEq + Eq + std::hash::Hash + Send + Sync + 'static,
{
fn default() -> Self {
Self::new()
}
}
#[derive(Debug)]
struct NonExistentError;
impl std::fmt::Display for NonExistentError {
fn fmt(&self, _: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
Ok(())
}
}
impl Error for NonExistentError {}