use backon::{BackoffBuilder, ExponentialBuilder};
use std::{
convert::Infallible, error::Error, fmt, future::Future, time::Duration,
};
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum GoneCheckResult {
StillAvailable,
Gone,
}
#[derive(Debug)]
pub struct RetryOperationError<E> {
pub attempt: usize,
pub kind: RetryOperationErrorKind<E>,
}
#[derive(Debug)]
pub enum RetryOperationErrorKind<E> {
OperationError(progenitor_client::Error<E>),
RetriesExhausted(progenitor_client::Error<E>),
}
impl<E> fmt::Display for RetryOperationErrorKind<E> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::OperationError(_) => {
f.write_str("progenitor API operation failed")
}
Self::RetriesExhausted(_) => f.write_str("retries exhausted"),
}
}
}
impl<E> fmt::Display for RetryOperationError<E> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "failed at attempt {}: ", self.attempt)?;
self.kind.fmt(f)
}
}
impl<E> Error for RetryOperationError<E>
where
E: fmt::Debug + 'static,
{
fn source(&self) -> Option<&(dyn Error + 'static)> {
match &self.kind {
RetryOperationErrorKind::OperationError(e)
| RetryOperationErrorKind::RetriesExhausted(e) => Some(e),
}
}
}
impl<E> RetryOperationError<E> {
pub fn is_not_found(&self) -> bool {
match &self.kind {
RetryOperationErrorKind::OperationError(e)
| RetryOperationErrorKind::RetriesExhausted(e) => {
e.status() == Some(http::StatusCode::NOT_FOUND)
}
}
}
}
#[derive(Debug)]
pub struct RetryOperationWhileError<E, GoneErr = Infallible> {
pub attempt: usize,
pub kind: RetryOperationWhileErrorKind<E, GoneErr>,
}
#[derive(Debug)]
pub enum RetryOperationWhileErrorKind<E, GoneErr = Infallible> {
Gone,
GoneCheckError(GoneErr),
OperationError(progenitor_client::Error<E>),
RetriesExhausted(progenitor_client::Error<E>),
}
impl<E, GoneErr> fmt::Display for RetryOperationWhileErrorKind<E, GoneErr> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Gone => f.write_str("remote server is gone"),
Self::GoneCheckError(_) => {
f.write_str("failed to determine whether remote server is gone")
}
Self::OperationError(_) => {
f.write_str("progenitor API operation failed")
}
Self::RetriesExhausted(_) => f.write_str("retries exhausted"),
}
}
}
impl<E, GoneErr> fmt::Display for RetryOperationWhileError<E, GoneErr> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "failed at attempt {}: ", self.attempt)?;
self.kind.fmt(f)
}
}
impl<E, GoneErr> Error for RetryOperationWhileError<E, GoneErr>
where
E: fmt::Debug + 'static,
GoneErr: Error + 'static,
{
fn source(&self) -> Option<&(dyn Error + 'static)> {
match &self.kind {
RetryOperationWhileErrorKind::Gone => None,
RetryOperationWhileErrorKind::GoneCheckError(e) => Some(e),
RetryOperationWhileErrorKind::OperationError(e)
| RetryOperationWhileErrorKind::RetriesExhausted(e) => Some(e),
}
}
}
impl<E, GoneErr> RetryOperationWhileError<E, GoneErr> {
pub fn is_not_found(&self) -> bool {
match &self.kind {
RetryOperationWhileErrorKind::OperationError(e)
| RetryOperationWhileErrorKind::RetriesExhausted(e) => {
e.status() == Some(http::StatusCode::NOT_FOUND)
}
RetryOperationWhileErrorKind::Gone
| RetryOperationWhileErrorKind::GoneCheckError(_) => false,
}
}
pub fn is_gone(&self) -> bool {
match &self.kind {
RetryOperationWhileErrorKind::Gone => true,
RetryOperationWhileErrorKind::GoneCheckError(_)
| RetryOperationWhileErrorKind::OperationError(_)
| RetryOperationWhileErrorKind::RetriesExhausted(_) => false,
}
}
}
#[derive(Debug)]
pub struct IndefiniteRetryOperationError<E> {
pub attempt: usize,
pub error: progenitor_client::Error<E>,
}
impl<E> fmt::Display for IndefiniteRetryOperationError<E> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"failed at attempt {}: progenitor API operation failed",
self.attempt,
)
}
}
impl<E> Error for IndefiniteRetryOperationError<E>
where
E: fmt::Debug + 'static,
{
fn source(&self) -> Option<&(dyn Error + 'static)> {
Some(&self.error)
}
}
impl<E> IndefiniteRetryOperationError<E> {
pub fn is_not_found(&self) -> bool {
self.error.status() == Some(http::StatusCode::NOT_FOUND)
}
}
#[derive(Debug)]
pub struct IndefiniteRetryOperationWhileError<E, GoneErr = Infallible> {
pub attempt: usize,
pub kind: IndefiniteRetryOperationWhileErrorKind<E, GoneErr>,
}
#[derive(Debug)]
pub enum IndefiniteRetryOperationWhileErrorKind<E, GoneErr = Infallible> {
Gone,
GoneCheckError(GoneErr),
OperationError(progenitor_client::Error<E>),
}
impl<E, GoneErr> fmt::Display
for IndefiniteRetryOperationWhileErrorKind<E, GoneErr>
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Gone => f.write_str("remote server is gone"),
Self::GoneCheckError(_) => {
f.write_str("failed to determine whether remote server is gone")
}
Self::OperationError(_) => {
f.write_str("progenitor API operation failed")
}
}
}
}
impl<E, GoneErr> fmt::Display
for IndefiniteRetryOperationWhileError<E, GoneErr>
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "failed at attempt {}: ", self.attempt)?;
self.kind.fmt(f)
}
}
impl<E, GoneErr> Error for IndefiniteRetryOperationWhileError<E, GoneErr>
where
E: fmt::Debug + 'static,
GoneErr: Error + 'static,
{
fn source(&self) -> Option<&(dyn Error + 'static)> {
match &self.kind {
IndefiniteRetryOperationWhileErrorKind::Gone => None,
IndefiniteRetryOperationWhileErrorKind::GoneCheckError(e) => {
Some(e)
}
IndefiniteRetryOperationWhileErrorKind::OperationError(e) => {
Some(e)
}
}
}
}
impl<E, GoneErr> IndefiniteRetryOperationWhileError<E, GoneErr> {
pub fn is_not_found(&self) -> bool {
match &self.kind {
IndefiniteRetryOperationWhileErrorKind::OperationError(e) => {
e.status() == Some(http::StatusCode::NOT_FOUND)
}
IndefiniteRetryOperationWhileErrorKind::Gone
| IndefiniteRetryOperationWhileErrorKind::GoneCheckError(_) => {
false
}
}
}
pub fn is_gone(&self) -> bool {
match &self.kind {
IndefiniteRetryOperationWhileErrorKind::Gone => true,
IndefiniteRetryOperationWhileErrorKind::GoneCheckError(_)
| IndefiniteRetryOperationWhileErrorKind::OperationError(_) => {
false
}
}
}
}
pub fn default_retry_policy() -> ExponentialBuilder {
ExponentialBuilder::default()
.with_factor(2.0)
.with_min_delay(Duration::from_millis(167))
.with_max_delay(Duration::from_secs(60 * 3))
.with_max_times(13)
.with_jitter()
}
#[derive(Clone, Copy, Debug)]
pub struct IndefiniteBackoffParams {
pub factor: f32,
pub min_delay: Duration,
pub max_delay: Duration,
pub jitter: bool,
}
impl IndefiniteBackoffParams {
fn build(self) -> impl Iterator<Item = Duration> {
let mut builder = ExponentialBuilder::default()
.with_factor(self.factor)
.with_min_delay(self.min_delay)
.with_max_delay(self.max_delay)
.without_max_times()
.with_total_delay(None);
if self.jitter {
builder = builder.with_jitter();
}
builder.build()
}
}
pub fn default_indefinite_retry_policy() -> IndefiniteBackoffParams {
IndefiniteBackoffParams {
factor: 2.0,
min_delay: Duration::from_millis(167),
max_delay: Duration::from_secs(60 * 3),
jitter: true,
}
}
#[derive(Debug)]
#[non_exhaustive]
pub struct RetryNotification<E> {
pub attempt: usize,
pub error: progenitor_client::Error<E>,
pub delay: Duration,
}
pub async fn retry_operation<T, E, B, N, F, Fut>(
backoff: B,
mut operation: F,
mut notify: N,
) -> Result<T, RetryOperationError<E>>
where
B: BackoffBuilder,
N: FnMut(RetryNotification<E>),
F: FnMut() -> Fut,
Fut: Future<Output = Result<T, progenitor_client::Error<E>>>,
{
let mut delays = backoff.build();
let mut attempt = 1;
loop {
match (operation)().await {
Ok(v) => return Ok(v),
Err(error) => {
if !error.is_retryable() {
return Err(RetryOperationError {
attempt,
kind: RetryOperationErrorKind::OperationError(error),
});
}
match delays.next() {
Some(delay) => {
notify(RetryNotification { attempt, error, delay });
tokio::time::sleep(delay).await;
attempt += 1;
}
None => {
return Err(RetryOperationError {
attempt,
kind: RetryOperationErrorKind::RetriesExhausted(
error,
),
});
}
}
}
}
}
}
pub async fn retry_operation_while<T, E, GoneErr, B, N, F, Fut, GF, GFut>(
backoff: B,
mut operation: F,
mut gone_check: GF,
mut notify: N,
) -> Result<T, RetryOperationWhileError<E, GoneErr>>
where
B: BackoffBuilder,
N: FnMut(RetryNotification<E>),
F: FnMut() -> Fut,
Fut: Future<Output = Result<T, progenitor_client::Error<E>>>,
GF: FnMut() -> GFut,
GFut: Future<Output = Result<GoneCheckResult, GoneErr>>,
{
let mut delays = backoff.build();
let mut attempt = 1;
loop {
match (gone_check)().await {
Ok(GoneCheckResult::Gone) => {
return Err(RetryOperationWhileError {
attempt,
kind: RetryOperationWhileErrorKind::Gone,
});
}
Ok(GoneCheckResult::StillAvailable) => {}
Err(e) => {
return Err(RetryOperationWhileError {
attempt,
kind: RetryOperationWhileErrorKind::GoneCheckError(e),
});
}
}
match (operation)().await {
Ok(v) => return Ok(v),
Err(error) => {
if !error.is_retryable() {
return Err(RetryOperationWhileError {
attempt,
kind: RetryOperationWhileErrorKind::OperationError(
error,
),
});
}
match delays.next() {
Some(delay) => {
notify(RetryNotification { attempt, error, delay });
tokio::time::sleep(delay).await;
attempt += 1;
}
None => {
return Err(RetryOperationWhileError {
attempt,
kind:
RetryOperationWhileErrorKind::RetriesExhausted(
error,
),
});
}
}
}
}
}
}
pub async fn retry_operation_indefinitely<T, E, N, F, Fut>(
backoff: IndefiniteBackoffParams,
mut operation: F,
mut notify: N,
) -> Result<T, IndefiniteRetryOperationError<E>>
where
N: FnMut(RetryNotification<E>),
F: FnMut() -> Fut,
Fut: Future<Output = Result<T, progenitor_client::Error<E>>>,
{
let mut delays = backoff.build();
let mut attempt = 1;
loop {
match (operation)().await {
Ok(v) => return Ok(v),
Err(error) => {
if !error.is_retryable() {
return Err(IndefiniteRetryOperationError {
attempt,
error,
});
}
let delay = delays.next().unwrap_or_else(|| {
panic!(
"infinite backoff iterator produced a delay \
at attempt {attempt} (was usize::MAX exceeded?)"
)
});
notify(RetryNotification { attempt, error, delay });
tokio::time::sleep(delay).await;
attempt += 1;
}
}
}
}
pub async fn retry_operation_while_indefinitely<
T,
E,
GoneErr,
N,
F,
Fut,
GF,
GFut,
>(
backoff: IndefiniteBackoffParams,
mut operation: F,
mut gone_check: GF,
mut notify: N,
) -> Result<T, IndefiniteRetryOperationWhileError<E, GoneErr>>
where
N: FnMut(RetryNotification<E>),
F: FnMut() -> Fut,
Fut: Future<Output = Result<T, progenitor_client::Error<E>>>,
GF: FnMut() -> GFut,
GFut: Future<Output = Result<GoneCheckResult, GoneErr>>,
{
let mut delays = backoff.build();
let mut attempt = 1;
loop {
match (gone_check)().await {
Ok(GoneCheckResult::Gone) => {
return Err(IndefiniteRetryOperationWhileError {
attempt,
kind: IndefiniteRetryOperationWhileErrorKind::Gone,
});
}
Ok(GoneCheckResult::StillAvailable) => {}
Err(e) => {
return Err(IndefiniteRetryOperationWhileError {
attempt,
kind:
IndefiniteRetryOperationWhileErrorKind::GoneCheckError(
e,
),
});
}
}
match (operation)().await {
Ok(v) => return Ok(v),
Err(error) => {
if !error.is_retryable() {
return Err(IndefiniteRetryOperationWhileError {
attempt,
kind:
IndefiniteRetryOperationWhileErrorKind::OperationError(
error,
),
});
}
let delay = delays.next().unwrap_or_else(|| {
panic!(
"infinite backoff iterator produced a delay \
at attempt {attempt} (was usize::MAX exceeded?)"
)
});
notify(RetryNotification { attempt, error, delay });
tokio::time::sleep(delay).await;
attempt += 1;
}
}
}
}