use std::error::Error;
use std::time::{Duration, Instant};
use qubit_function::Consumer;
use super::{
AbortEvent, DefaultRetryConfig, FailureEvent, RetryBuilder, RetryConfig, RetryDecision,
RetryError, RetryEvent, RetryReason, RetryResult, SuccessEvent,
};
pub struct RetryExecutor<T, C: RetryConfig = DefaultRetryConfig> {
builder: RetryBuilder<T, C>,
}
impl<T, C> RetryExecutor<T, C>
where
T: Clone + PartialEq + Eq + std::hash::Hash + Send + Sync + 'static,
C: RetryConfig,
{
pub(crate) fn new(builder: RetryBuilder<T, C>) -> Self {
Self { builder }
}
fn check_max_duration_exceeded(
&self,
start_time: Instant,
max_duration: Option<Duration>,
attempt: u32,
) -> Option<RetryError> {
if let Some(max_dur) = max_duration {
let elapsed = start_time.elapsed();
if elapsed >= max_dur {
let failure_event = FailureEvent::builder()
.attempt_count(attempt)
.total_duration(elapsed)
.build();
if let Some(listener) = self.builder.failure_listener() {
listener.accept(&failure_event);
}
return Some(RetryError::max_duration_exceeded(elapsed, max_dur));
}
}
None
}
fn check_operation_timeout(
&self,
result: Result<T, Box<dyn Error + Send + Sync>>,
operation_duration: Duration,
) -> Result<T, Box<dyn Error + Send + Sync>> {
if let Some(timeout) = self.builder.operation_timeout() {
if operation_duration > timeout {
return Err(
Box::new(RetryError::operation_timeout(operation_duration, timeout))
as Box<dyn Error + Send + Sync>,
);
}
}
result
}
fn handle_success(&self, value: T, attempt: u32, start_time: Instant) -> RetryResult<T> {
let success_event = SuccessEvent::builder()
.result(value.clone())
.attempt_count(attempt)
.total_duration(start_time.elapsed())
.build();
if let Some(listener) = self.builder.success_listener() {
listener.accept(&success_event);
}
Ok(value)
}
fn handle_abort(
&self,
reason: super::AbortReason<T>,
attempt: u32,
start_time: Instant,
) -> RetryResult<T> {
let abort_event = AbortEvent::builder()
.reason(reason)
.attempt_count(attempt)
.total_duration(start_time.elapsed())
.build();
if let Some(listener) = self.builder.abort_listener() {
listener.accept(&abort_event);
}
Err(RetryError::aborted("Operation aborted"))
}
fn check_max_attempts_exceeded(&self, attempt: u32, max_attempts: u32) -> bool {
attempt >= max_attempts
}
fn handle_max_attempts_exceeded(
&self,
attempt: u32,
max_attempts: u32,
reason: RetryReason<T>,
start_time: Instant,
) -> RetryError {
let failure_event = match reason {
RetryReason::Error(error) => FailureEvent::builder()
.last_error(Some(error))
.attempt_count(attempt)
.total_duration(start_time.elapsed())
.build(),
RetryReason::Result(result) => FailureEvent::builder()
.last_result(Some(result))
.attempt_count(attempt)
.total_duration(start_time.elapsed())
.build(),
};
if let Some(listener) = self.builder.failure_listener() {
listener.accept(&failure_event);
}
RetryError::max_attempts_exceeded(attempt, max_attempts)
}
fn calculate_delay(&self, attempt: u32) -> Duration {
let delay_strategy = self.builder.delay_strategy();
let jitter_factor = self.builder.jitter_factor();
delay_strategy.calculate_delay(attempt, jitter_factor)
}
fn create_retry_event(
&self,
attempt: u32,
max_attempts: u32,
reason: RetryReason<T>,
delay: Duration,
start_time: Instant,
) -> RetryEvent<T> {
match reason {
RetryReason::Error(error) => RetryEvent::builder()
.attempt_count(attempt)
.max_attempts(max_attempts)
.last_error(Some(error))
.next_delay(delay)
.total_duration(start_time.elapsed())
.build(),
RetryReason::Result(result) => RetryEvent::builder()
.attempt_count(attempt)
.max_attempts(max_attempts)
.last_result(Some(result))
.next_delay(delay)
.total_duration(start_time.elapsed())
.build(),
}
}
fn trigger_retry_and_wait(&self, retry_event: RetryEvent<T>, delay: Duration) {
if let Some(listener) = self.builder.retry_listener() {
listener.accept(&retry_event);
}
if delay > Duration::ZERO {
std::thread::sleep(delay);
}
}
async fn trigger_retry_and_wait_async(&self, retry_event: RetryEvent<T>, delay: Duration) {
if let Some(listener) = self.builder.retry_listener() {
listener.accept(&retry_event);
}
if delay > Duration::ZERO {
tokio::time::sleep(delay).await;
}
}
fn execute_operation_and_get_decision<F>(&self, operation: &mut F) -> RetryDecision<T>
where
F: FnMut() -> Result<T, Box<dyn Error + Send + Sync>>,
{
let operation_start = Instant::now();
let result = operation();
let operation_duration = operation_start.elapsed();
let result = self.check_operation_timeout(result, operation_duration);
self.builder.get_retry_decision(result)
}
async fn execute_operation_async_and_get_decision<F, Fut>(
&self,
operation: &mut F,
) -> RetryDecision<T>
where
F: FnMut() -> Fut,
Fut: std::future::Future<Output = Result<T, Box<dyn Error + Send + Sync>>>,
{
let operation_start = Instant::now();
let operation_timeout = self.builder.operation_timeout();
let result = if let Some(timeout_duration) = operation_timeout {
match tokio::time::timeout(timeout_duration, operation()).await {
Ok(result) => result,
Err(_elapsed) => {
let duration = operation_start.elapsed();
Err(
Box::new(RetryError::operation_timeout(duration, timeout_duration))
as Box<dyn Error + Send + Sync>,
)
}
}
} else {
operation().await
};
self.builder.get_retry_decision(result)
}
fn handle_decision(
&self,
decision: RetryDecision<T>,
attempt: u32,
max_attempts: u32,
start_time: Instant,
) -> Result<Option<T>, RetryError> {
match decision {
RetryDecision::Success(value) => {
self.handle_success(value.clone(), attempt, start_time)?;
Ok(Some(value))
}
RetryDecision::Retry(reason) => {
if self.check_max_attempts_exceeded(attempt, max_attempts) {
let error = self.handle_max_attempts_exceeded(
attempt,
max_attempts,
reason,
start_time,
);
return Err(error);
}
let delay = self.calculate_delay(attempt);
let retry_event =
self.create_retry_event(attempt, max_attempts, reason, delay, start_time);
self.trigger_retry_and_wait(retry_event, delay);
Ok(None) }
RetryDecision::Abort(reason) => {
self.handle_abort(reason, attempt, start_time).map(|_| None) }
}
}
async fn handle_decision_async(
&self,
decision: RetryDecision<T>,
attempt: u32,
max_attempts: u32,
start_time: Instant,
) -> Result<Option<T>, RetryError> {
match decision {
RetryDecision::Success(value) => {
self.handle_success(value.clone(), attempt, start_time)?;
Ok(Some(value))
}
RetryDecision::Retry(reason) => {
if self.check_max_attempts_exceeded(attempt, max_attempts) {
let error = self.handle_max_attempts_exceeded(
attempt,
max_attempts,
reason,
start_time,
);
return Err(error);
}
let delay = self.calculate_delay(attempt);
let retry_event =
self.create_retry_event(attempt, max_attempts, reason, delay, start_time);
self.trigger_retry_and_wait_async(retry_event, delay).await;
Ok(None) }
RetryDecision::Abort(reason) => {
self.handle_abort(reason, attempt, start_time).map(|_| None) }
}
}
pub fn run<F>(&self, mut operation: F) -> RetryResult<T>
where
F: FnMut() -> Result<T, Box<dyn Error + Send + Sync>>,
{
let start_time = Instant::now();
let max_attempts = self.builder.max_attempts();
let max_duration = self.builder.max_duration();
let mut attempt = 0;
loop {
attempt += 1;
if let Some(error) = self.check_max_duration_exceeded(start_time, max_duration, attempt)
{
return Err(error);
}
let decision = self.execute_operation_and_get_decision(&mut operation);
match self.handle_decision(decision, attempt, max_attempts, start_time)? {
Some(value) => return Ok(value), None => continue, }
}
}
pub async fn run_async<F, Fut>(&self, mut operation: F) -> RetryResult<T>
where
F: FnMut() -> Fut,
Fut: std::future::Future<Output = Result<T, Box<dyn Error + Send + Sync>>>,
{
let start_time = Instant::now();
let max_attempts = self.builder.max_attempts();
let max_duration = self.builder.max_duration();
let mut attempt = 0;
loop {
attempt += 1;
if let Some(error) = self.check_max_duration_exceeded(start_time, max_duration, attempt)
{
return Err(error);
}
let decision = self
.execute_operation_async_and_get_decision(&mut operation)
.await;
match self
.handle_decision_async(decision, attempt, max_attempts, start_time)
.await?
{
Some(value) => return Ok(value), None => continue, }
}
}
pub fn config(&self) -> &RetryBuilder<T, C> {
&self.builder
}
}