use qubit_common::BoxError;
use qubit_function::{BiConsumer, BiFunction, Consumer};
use std::fmt;
#[cfg(feature = "tokio")]
use std::future::Future;
use std::io;
use std::panic;
use std::sync::Arc;
use std::sync::mpsc;
use std::thread::JoinHandle;
use std::time::{Duration, Instant};
#[cfg(feature = "tokio")]
use super::async_attempt::AsyncAttempt;
#[cfg(feature = "tokio")]
use super::async_value_operation::AsyncValueOperation;
use super::attempt_cancel_token::AttemptCancelToken;
use super::blocking_attempt_message::BlockingAttemptMessage;
use super::retry_flow_action::RetryFlowAction;
use super::sync_attempt::SyncAttempt;
use super::sync_value_operation::SyncValueOperation;
use crate::event::RetryListeners;
use crate::{
AttemptExecutorError, AttemptFailure, AttemptFailureDecision, AttemptPanic,
AttemptTimeoutPolicy, AttemptTimeoutSource, RetryAfterHint, RetryBuilder, RetryConfigError,
RetryContext, RetryError, RetryErrorReason, RetryOptions,
};
#[derive(Clone)]
pub struct Retry<E = BoxError> {
options: RetryOptions,
retry_after_hint: Option<RetryAfterHint<E>>,
isolate_listener_panics: bool,
listeners: RetryListeners<E>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
struct EffectiveAttemptTimeout {
duration: Option<Duration>,
source: Option<AttemptTimeoutSource>,
}
impl EffectiveAttemptTimeout {
#[inline]
fn new(duration: Option<Duration>, source: Option<AttemptTimeoutSource>) -> Self {
Self { duration, source }
}
#[inline]
fn is_max_elapsed_timeout<E>(&self, failure: &AttemptFailure<E>) -> bool {
self.source == Some(AttemptTimeoutSource::MaxElapsed)
&& matches!(failure, AttemptFailure::Timeout)
}
}
struct BlockingAttemptOutcome<T, E> {
result: Result<T, AttemptFailure<E>>,
unreaped_worker_count: u32,
}
impl<T, E> BlockingAttemptOutcome<T, E> {
#[inline]
fn new(result: Result<T, AttemptFailure<E>>, unreaped_worker_count: u32) -> Self {
Self {
result,
unreaped_worker_count,
}
}
}
#[allow(clippy::result_large_err)]
impl<E> Retry<E> {
#[inline]
pub fn builder() -> RetryBuilder<E> {
RetryBuilder::new()
}
pub fn from_options(options: RetryOptions) -> Result<Self, RetryConfigError> {
Self::builder().options(options).build()
}
#[inline]
pub fn options(&self) -> &RetryOptions {
&self.options
}
pub fn run<T, F>(&self, mut operation: F) -> Result<T, RetryError<E>>
where
F: FnMut() -> Result<T, E>,
{
if self.options.attempt_timeout().is_some() {
let attempt_timeout = self.attempt_timeout_duration();
return Err(self.emit_error(RetryError::new(
RetryErrorReason::UnsupportedOperation,
None,
RetryContext::new(
0,
self.options.max_attempts.get(),
self.options.max_elapsed,
Duration::ZERO,
Duration::ZERO,
attempt_timeout,
)
.with_attempt_timeout_source(Some(AttemptTimeoutSource::Configured)),
)));
}
let mut operation = SyncValueOperation::new(&mut operation);
self.run_sync_operation(&mut operation)
.map(|()| operation.into_value())
}
pub fn run_in_worker<T, F>(&self, operation: F) -> Result<T, RetryError<E>>
where
T: Send + 'static,
E: Send + 'static,
F: Fn(AttemptCancelToken) -> Result<T, E> + Send + Sync + 'static,
{
let operation = Arc::new(operation);
let mut total_elapsed = Duration::ZERO;
let mut attempts = 0;
let mut last_failure = None;
loop {
let attempt_timeout = self.effective_attempt_timeout(total_elapsed);
if let Some(error) = self.elapsed_error(
total_elapsed,
attempts,
last_failure.take(),
attempt_timeout,
) {
return Err(self.emit_error(error));
}
attempts += 1;
let attempt_timeout = self.effective_attempt_timeout(total_elapsed);
let before_context = self
.context(
total_elapsed,
attempts,
Duration::ZERO,
attempt_timeout.duration,
)
.with_attempt_timeout_source(attempt_timeout.source);
self.emit_before_attempt(&before_context);
let attempt_start = Instant::now();
let outcome =
self.call_blocking_attempt(Arc::clone(&operation), attempt_timeout.duration);
let attempt_elapsed = attempt_start.elapsed();
total_elapsed = add_elapsed(total_elapsed, attempt_elapsed);
let context = self
.context(
total_elapsed,
attempts,
attempt_elapsed,
attempt_timeout.duration,
)
.with_attempt_timeout_source(attempt_timeout.source)
.with_unreaped_worker_count(outcome.unreaped_worker_count);
match outcome.result {
Ok(value) => {
self.emit_attempt_success(&context);
return Ok(value);
}
Err(failure) => {
if attempt_timeout.is_max_elapsed_timeout(&failure) {
return Err(self.emit_error(RetryError::new(
RetryErrorReason::MaxElapsedExceeded,
Some(failure),
context,
)));
}
let retry_block_reason = (context.unreaped_worker_count() > 0)
.then_some(RetryErrorReason::WorkerStillRunning);
match self.handle_failure(attempts, failure, context, retry_block_reason) {
RetryFlowAction::Retry { delay, failure } => {
if !delay.is_zero() {
std::thread::sleep(delay);
}
last_failure = Some(failure);
}
RetryFlowAction::Finished(error) => return Err(self.emit_error(error)),
}
}
}
}
}
#[inline]
pub fn run_blocking_with_timeout<T, F>(&self, operation: F) -> Result<T, RetryError<E>>
where
T: Send + 'static,
E: Send + 'static,
F: Fn(AttemptCancelToken) -> Result<T, E> + Send + Sync + 'static,
{
self.run_in_worker(operation)
}
fn run_sync_operation(&self, operation: &mut dyn SyncAttempt<E>) -> Result<(), RetryError<E>> {
let mut total_elapsed = Duration::ZERO;
let mut attempts = 0;
let mut last_failure = None;
loop {
if let Some(error) = self.elapsed_error(
total_elapsed,
attempts,
last_failure.take(),
EffectiveAttemptTimeout::new(None, None),
) {
return Err(self.emit_error(error));
}
attempts += 1;
let before_context = self.context(total_elapsed, attempts, Duration::ZERO, None);
self.emit_before_attempt(&before_context);
let attempt_start = Instant::now();
match operation.call() {
Ok(()) => {
let attempt_elapsed = attempt_start.elapsed();
total_elapsed = add_elapsed(total_elapsed, attempt_elapsed);
let context = self.context(total_elapsed, attempts, attempt_elapsed, None);
self.emit_attempt_success(&context);
return Ok(());
}
Err(failure) => {
let attempt_elapsed = attempt_start.elapsed();
total_elapsed = add_elapsed(total_elapsed, attempt_elapsed);
let context = self.context(total_elapsed, attempts, attempt_elapsed, None);
match self.handle_failure(attempts, failure, context, None) {
RetryFlowAction::Retry { delay, failure } => {
if !delay.is_zero() {
std::thread::sleep(delay);
}
last_failure = Some(failure);
}
RetryFlowAction::Finished(error) => return Err(self.emit_error(error)),
}
}
}
}
}
#[cfg(feature = "tokio")]
pub async fn run_async<T, F, Fut>(&self, mut operation: F) -> Result<T, RetryError<E>>
where
F: FnMut() -> Fut,
Fut: Future<Output = Result<T, E>>,
{
let mut operation = AsyncValueOperation::new(&mut operation);
self.run_async_operation(&mut operation)
.await
.map(|()| operation.into_value())
}
#[cfg(feature = "tokio")]
async fn run_async_operation(
&self,
operation: &mut dyn AsyncAttempt<E>,
) -> Result<(), RetryError<E>> {
let mut total_elapsed = Duration::ZERO;
let mut attempts = 0;
let mut last_failure = None;
loop {
let attempt_timeout = self.effective_attempt_timeout(total_elapsed);
if let Some(error) = self.elapsed_error(
total_elapsed,
attempts,
last_failure.take(),
attempt_timeout,
) {
return Err(self.emit_error(error));
}
attempts += 1;
let attempt_timeout = self.effective_attempt_timeout(total_elapsed);
let before_context = self
.context(
total_elapsed,
attempts,
Duration::ZERO,
attempt_timeout.duration,
)
.with_attempt_timeout_source(attempt_timeout.source);
self.emit_before_attempt(&before_context);
let attempt_start = Instant::now();
let result = if let Some(timeout) = attempt_timeout.duration {
match tokio::time::timeout(timeout, operation.call()).await {
Ok(result) => result,
Err(_) => Err(AttemptFailure::Timeout),
}
} else {
operation.call().await
};
let attempt_elapsed = attempt_start.elapsed();
total_elapsed = add_elapsed(total_elapsed, attempt_elapsed);
let context = self
.context(
total_elapsed,
attempts,
attempt_elapsed,
attempt_timeout.duration,
)
.with_attempt_timeout_source(attempt_timeout.source);
match result {
Ok(()) => {
self.emit_attempt_success(&context);
return Ok(());
}
Err(failure) => {
if attempt_timeout.is_max_elapsed_timeout(&failure) {
return Err(self.emit_error(RetryError::new(
RetryErrorReason::MaxElapsedExceeded,
Some(failure),
context,
)));
}
match self.handle_failure(attempts, failure, context, None) {
RetryFlowAction::Retry { delay, failure } => {
sleep_async(delay).await;
last_failure = Some(failure);
}
RetryFlowAction::Finished(error) => return Err(self.emit_error(error)),
}
}
}
}
}
pub(super) fn new(
options: RetryOptions,
retry_after_hint: Option<RetryAfterHint<E>>,
isolate_listener_panics: bool,
listeners: RetryListeners<E>,
) -> Self {
Self {
options,
retry_after_hint,
isolate_listener_panics,
listeners,
}
}
fn context(
&self,
total_elapsed: Duration,
attempt: u32,
attempt_elapsed: Duration,
attempt_timeout: Option<Duration>,
) -> RetryContext {
RetryContext::new(
attempt,
self.options.max_attempts.get(),
self.options.max_elapsed,
total_elapsed,
attempt_elapsed,
attempt_timeout,
)
}
#[inline]
fn attempt_timeout_duration(&self) -> Option<Duration> {
self.options
.attempt_timeout()
.map(|attempt_timeout| attempt_timeout.timeout())
}
fn effective_attempt_timeout(&self, total_elapsed: Duration) -> EffectiveAttemptTimeout {
let configured = self.attempt_timeout_duration();
let remaining = self.remaining_elapsed(total_elapsed);
match (configured, remaining) {
(None, None) => EffectiveAttemptTimeout::new(None, None),
(Some(timeout), None) => {
EffectiveAttemptTimeout::new(Some(timeout), Some(AttemptTimeoutSource::Configured))
}
(None, Some(remaining)) => EffectiveAttemptTimeout::new(
Some(remaining),
Some(AttemptTimeoutSource::MaxElapsed),
),
(Some(configured), Some(remaining)) if configured <= remaining => {
EffectiveAttemptTimeout::new(
Some(configured),
Some(AttemptTimeoutSource::Configured),
)
}
(Some(_), Some(remaining)) => EffectiveAttemptTimeout::new(
Some(remaining),
Some(AttemptTimeoutSource::MaxElapsed),
),
}
}
#[inline]
fn remaining_elapsed(&self, total_elapsed: Duration) -> Option<Duration> {
self.options
.max_elapsed
.map(|max_elapsed| max_elapsed.saturating_sub(total_elapsed))
}
fn call_blocking_attempt<T, F>(
&self,
operation: Arc<F>,
attempt_timeout: Option<Duration>,
) -> BlockingAttemptOutcome<T, E>
where
T: Send + 'static,
E: Send + 'static,
F: Fn(AttemptCancelToken) -> Result<T, E> + Send + Sync + 'static,
{
let token = AttemptCancelToken::new();
let (sender, receiver) = mpsc::sync_channel(1);
let worker_token = token.clone();
let worker = std::thread::Builder::new()
.name("qubit-retry-worker".to_string())
.spawn(move || {
let result =
panic::catch_unwind(panic::AssertUnwindSafe(|| operation(worker_token)));
let message = match result {
Ok(result) => BlockingAttemptMessage::Result(result),
Err(payload) => {
BlockingAttemptMessage::Panic(AttemptPanic::from_payload(payload))
}
};
let _ = sender.send(message);
});
#[cfg(not(coverage))]
let worker = match worker {
Ok(worker) => worker,
Err(error) => {
return BlockingAttemptOutcome::new(
Err(worker_spawn_error_to_attempt_failure(error)),
0,
);
}
};
#[cfg(coverage)]
let worker = worker.expect("retry worker should spawn during coverage");
match attempt_timeout {
Some(attempt_timeout) => {
let message = receiver.recv_timeout(attempt_timeout);
self.worker_timeout_message_to_attempt_outcome(message, receiver, worker, &token)
}
None => {
let result = worker_recv_message_to_attempt_result(receiver.recv());
join_finished_worker(worker);
BlockingAttemptOutcome::new(result, 0)
}
}
}
fn handle_failure(
&self,
attempts: u32,
failure: AttemptFailure<E>,
context: RetryContext,
retry_block_reason: Option<RetryErrorReason>,
) -> RetryFlowAction<E> {
let hint = self
.retry_after_hint
.as_ref()
.and_then(|hint| self.invoke_listener(|| hint.apply(&failure, &context)));
let context = context.with_retry_after_hint(hint);
let decision =
self.resolve_failure_decision(self.failure_decision(&failure, &context), &failure);
if decision == AttemptFailureDecision::Abort {
return RetryFlowAction::Finished(RetryError::new(
RetryErrorReason::Aborted,
Some(failure),
context,
));
}
let max_attempts = self.options.max_attempts.get();
if attempts >= max_attempts {
return RetryFlowAction::Finished(RetryError::new(
RetryErrorReason::AttemptsExceeded,
Some(failure),
context,
));
}
if self.elapsed_budget_exhausted(context.total_elapsed()) {
return RetryFlowAction::Finished(RetryError::new(
RetryErrorReason::MaxElapsedExceeded,
Some(failure),
context,
));
}
if let Some(reason) = retry_block_reason {
return RetryFlowAction::Finished(RetryError::new(reason, Some(failure), context));
}
let delay = self.retry_delay(decision, attempts, hint);
let context = context.with_next_delay(delay);
self.emit_retry_scheduled(&failure, &context);
RetryFlowAction::Retry { delay, failure }
}
fn failure_decision(
&self,
failure: &AttemptFailure<E>,
context: &RetryContext,
) -> AttemptFailureDecision {
let mut decision = AttemptFailureDecision::UseDefault;
for listener in &self.listeners.failure {
let current = self.invoke_listener(|| listener.apply(failure, context));
if current != AttemptFailureDecision::UseDefault {
decision = current;
}
}
decision
}
fn resolve_failure_decision(
&self,
decision: AttemptFailureDecision,
failure: &AttemptFailure<E>,
) -> AttemptFailureDecision {
if decision != AttemptFailureDecision::UseDefault {
return decision;
}
if matches!(failure, AttemptFailure::Timeout)
&& let Some(attempt_timeout) = self.options.attempt_timeout()
{
match attempt_timeout.policy() {
AttemptTimeoutPolicy::Retry => AttemptFailureDecision::Retry,
AttemptTimeoutPolicy::Abort => AttemptFailureDecision::Abort,
}
} else if matches!(
failure,
AttemptFailure::Panic(_) | AttemptFailure::Executor(_)
) {
AttemptFailureDecision::Abort
} else {
AttemptFailureDecision::UseDefault
}
}
fn retry_delay(
&self,
decision: AttemptFailureDecision,
attempts: u32,
hint: Option<Duration>,
) -> Duration {
match decision {
AttemptFailureDecision::RetryAfter(delay) => delay,
AttemptFailureDecision::UseDefault => hint.unwrap_or_else(|| {
self.options
.jitter
.delay_for_attempt(&self.options.delay, attempts)
}),
AttemptFailureDecision::Retry | AttemptFailureDecision::Abort => self
.options
.jitter
.delay_for_attempt(&self.options.delay, attempts),
}
}
fn elapsed_error(
&self,
total_elapsed: Duration,
attempts: u32,
last_failure: Option<AttemptFailure<E>>,
attempt_timeout: EffectiveAttemptTimeout,
) -> Option<RetryError<E>> {
if !self.elapsed_budget_exhausted(total_elapsed) {
return None;
}
Some(RetryError::new(
RetryErrorReason::MaxElapsedExceeded,
last_failure,
self.context(
total_elapsed,
attempts,
Duration::ZERO,
attempt_timeout.duration,
)
.with_attempt_timeout_source(attempt_timeout.source),
))
}
#[inline]
fn elapsed_budget_exhausted(&self, total_elapsed: Duration) -> bool {
self.options
.max_elapsed
.is_some_and(|max_elapsed| total_elapsed >= max_elapsed)
}
fn emit_before_attempt(&self, context: &RetryContext) {
for listener in &self.listeners.before_attempt {
self.invoke_listener(|| {
listener.accept(context);
});
}
}
fn emit_attempt_success(&self, context: &RetryContext) {
for listener in &self.listeners.attempt_success {
self.invoke_listener(|| {
listener.accept(context);
});
}
}
fn emit_retry_scheduled(&self, failure: &AttemptFailure<E>, context: &RetryContext) {
for listener in &self.listeners.retry_scheduled {
self.invoke_listener(|| {
listener.accept(failure, context);
});
}
}
fn emit_error(&self, error: RetryError<E>) -> RetryError<E> {
for listener in &self.listeners.error {
self.invoke_listener(|| {
listener.accept(&error, error.context());
});
}
error
}
fn worker_timeout_message_to_attempt_outcome<T>(
&self,
message: Result<BlockingAttemptMessage<T, E>, mpsc::RecvTimeoutError>,
receiver: mpsc::Receiver<BlockingAttemptMessage<T, E>>,
worker: JoinHandle<()>,
token: &AttemptCancelToken,
) -> BlockingAttemptOutcome<T, E>
where
T: Send + 'static,
E: Send + 'static,
{
match message {
Ok(message) => {
let result = worker_message_to_attempt_result(message);
join_finished_worker(worker);
BlockingAttemptOutcome::new(result, 0)
}
Err(mpsc::RecvTimeoutError::Timeout) => {
token.cancel();
let worker_exited =
wait_for_cancelled_worker(&receiver, worker, self.options.worker_cancel_grace);
let unreaped_worker_count = if worker_exited { 0 } else { 1 };
BlockingAttemptOutcome::new(Err(AttemptFailure::Timeout), unreaped_worker_count)
}
Err(mpsc::RecvTimeoutError::Disconnected) => {
join_finished_worker(worker);
BlockingAttemptOutcome::new(Err(worker_disconnected_attempt_failure()), 0)
}
}
}
fn invoke_listener<R>(&self, call: impl FnOnce() -> R) -> R
where
R: Default,
{
if self.isolate_listener_panics {
std::panic::catch_unwind(std::panic::AssertUnwindSafe(call)).unwrap_or_default()
} else {
call()
}
}
}
impl<E> fmt::Debug for Retry<E> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Retry")
.field("options", &self.options)
.finish_non_exhaustive()
}
}
fn worker_message_to_attempt_result<T, E>(
message: BlockingAttemptMessage<T, E>,
) -> Result<T, AttemptFailure<E>> {
match message {
BlockingAttemptMessage::Result(result) => result.map_err(AttemptFailure::Error),
BlockingAttemptMessage::Panic(panic) => Err(AttemptFailure::Panic(panic)),
}
}
#[cfg_attr(coverage, allow(dead_code))]
fn worker_spawn_error_to_attempt_failure<E>(error: io::Error) -> AttemptFailure<E> {
AttemptFailure::Executor(AttemptExecutorError::from_spawn_error(error))
}
#[cfg(all(coverage, not(test)))]
fn worker_timeout_message_to_attempt_result<T, E>(
message: Result<BlockingAttemptMessage<T, E>, mpsc::RecvTimeoutError>,
token: &AttemptCancelToken,
) -> Result<T, AttemptFailure<E>> {
match message {
Ok(message) => worker_message_to_attempt_result(message),
Err(mpsc::RecvTimeoutError::Timeout) => {
token.cancel();
Err(AttemptFailure::Timeout)
}
Err(mpsc::RecvTimeoutError::Disconnected) => Err(worker_disconnected_attempt_failure()),
}
}
fn worker_recv_message_to_attempt_result<T, E>(
message: Result<BlockingAttemptMessage<T, E>, mpsc::RecvError>,
) -> Result<T, AttemptFailure<E>> {
match message {
Ok(message) => worker_message_to_attempt_result(message),
Err(_) => Err(worker_disconnected_attempt_failure()),
}
}
fn worker_disconnected_attempt_failure<E>() -> AttemptFailure<E> {
AttemptFailure::Executor(AttemptExecutorError::from_worker_disconnected())
}
fn wait_for_cancelled_worker<T, E>(
receiver: &mpsc::Receiver<BlockingAttemptMessage<T, E>>,
worker: JoinHandle<()>,
grace: Duration,
) -> bool {
let exited = if grace.is_zero() {
match receiver.try_recv() {
Ok(_) | Err(mpsc::TryRecvError::Disconnected) => true,
Err(mpsc::TryRecvError::Empty) => false,
}
} else {
match receiver.recv_timeout(grace) {
Ok(_) | Err(mpsc::RecvTimeoutError::Disconnected) => true,
Err(mpsc::RecvTimeoutError::Timeout) => false,
}
};
if exited {
join_finished_worker(worker);
}
exited
}
fn join_finished_worker(worker: JoinHandle<()>) {
let _ = worker.join();
}
fn add_elapsed(total_elapsed: Duration, attempt_elapsed: Duration) -> Duration {
total_elapsed.saturating_add(attempt_elapsed)
}
#[cfg(feature = "tokio")]
async fn sleep_async(delay: Duration) {
if !delay.is_zero() {
tokio::time::sleep(delay).await;
}
}
#[cfg(all(coverage, not(test)))]
#[doc(hidden)]
pub mod coverage_support {
use std::error::Error;
use std::io;
use std::sync::mpsc;
use std::time::Duration;
use crate::{
AttemptCancelToken, AttemptExecutorError, AttemptFailure, AttemptPanic, RetryContext,
RetryError, RetryErrorReason,
};
use super::{
BlockingAttemptMessage, worker_message_to_attempt_result,
worker_recv_message_to_attempt_result, worker_spawn_error_to_attempt_failure,
worker_timeout_message_to_attempt_result,
};
pub fn exercise_defensive_paths() -> Vec<String> {
let mut diagnostics = Vec::new();
let spawn_failure =
worker_spawn_error_to_attempt_failure::<&'static str>(io::Error::other("spawn failed"));
diagnostics.push(spawn_failure.to_string());
let timeout_token = AttemptCancelToken::new();
let timeout = worker_timeout_message_to_attempt_result::<(), &'static str>(
Err(mpsc::RecvTimeoutError::Timeout),
&timeout_token,
)
.expect_err("timeout receive should become an attempt failure");
diagnostics.push(format!(
"{timeout}; cancelled={}",
timeout_token.is_cancelled()
));
let timeout_disconnected = worker_timeout_message_to_attempt_result::<(), &'static str>(
Err(mpsc::RecvTimeoutError::Disconnected),
&AttemptCancelToken::new(),
)
.expect_err("disconnected timeout receive should become an executor failure");
diagnostics.push(timeout_disconnected.to_string());
let recv_disconnected =
worker_recv_message_to_attempt_result::<(), &'static str>(Err(mpsc::RecvError))
.expect_err("disconnected receive should become an executor failure");
diagnostics.push(recv_disconnected.to_string());
let panic_message = worker_message_to_attempt_result::<(), &'static str>(
BlockingAttemptMessage::Panic(AttemptPanic::new("coverage panic")),
)
.expect_err("panic message should become an attempt failure");
diagnostics.push(panic_message.to_string());
let static_panic = AttemptPanic::from_payload(Box::new("static panic"));
diagnostics.push(static_panic.to_string());
let string_panic = AttemptPanic::from_payload(Box::new(String::from("owned panic")));
diagnostics.push(string_panic.to_string());
let executor_error = RetryError::new(
RetryErrorReason::Aborted,
Some(AttemptFailure::<io::Error>::Executor(
AttemptExecutorError::new("executor source"),
)),
RetryContext::new(1, 1, None, Duration::ZERO, Duration::ZERO, None),
);
diagnostics.push(format!(
"executor reason={:?}; attempts={}; context_attempt={}",
executor_error.reason(),
executor_error.attempts(),
executor_error.context().attempt(),
));
diagnostics.push(
executor_error
.source()
.expect("executor failure should be an error source")
.to_string(),
);
let timeout_error = RetryError::new(
RetryErrorReason::Aborted,
Some(AttemptFailure::<io::Error>::Timeout),
RetryContext::new(1, 1, None, Duration::ZERO, Duration::ZERO, None),
);
diagnostics.push(format!(
"timeout source absent={}",
timeout_error.source().is_none()
));
let app_error = RetryError::new(
RetryErrorReason::AttemptsExceeded,
Some(AttemptFailure::<io::Error>::Error(io::Error::other(
"application source",
))),
RetryContext::new(2, 2, None, Duration::ZERO, Duration::ZERO, None),
);
diagnostics.push(
app_error
.last_failure()
.expect("application failure should exist")
.to_string(),
);
diagnostics.push(
app_error
.last_error()
.expect("last application error should exist")
.to_string(),
);
diagnostics.push(app_error.to_string());
let owned_error = RetryError::new(
RetryErrorReason::AttemptsExceeded,
Some(AttemptFailure::<io::Error>::Error(io::Error::other(
"owned application error",
))),
RetryContext::new(2, 2, None, Duration::ZERO, Duration::ZERO, None),
);
diagnostics.push(
owned_error
.into_last_error()
.expect("owned application error should be returned")
.to_string(),
);
let parted_error = RetryError::<io::Error>::new(
RetryErrorReason::MaxElapsedExceeded,
None,
RetryContext::new(
0,
2,
Some(Duration::ZERO),
Duration::ZERO,
Duration::ZERO,
None,
),
);
let (reason, last_failure, context) = parted_error.into_parts();
diagnostics.push(format!(
"parts reason={reason:?}; last_failure={}; max_elapsed={:?}",
last_failure.is_some(),
context.max_elapsed(),
));
diagnostics
}
}