use std::{
cmp::max,
fmt::Display,
future::Future,
time::{Duration, SystemTime},
};
use tokio::time::sleep;
use tracing::{error, instrument, trace, trace_span};
use tracing_futures::Instrument;
use crate::errors::*;
const MIN_SLEEP_SECS: u64 = 4;
#[derive(Copy, Clone, Debug, Eq, PartialEq)]
#[non_exhaustive]
pub enum BackoffType {
Linear,
Exponential,
}
#[derive(Debug)]
pub struct WaitOptions {
timeout: Option<Duration>,
retry_interval: Duration,
backoff_type: BackoffType,
allowed_errors: u16,
}
impl WaitOptions {
pub fn timeout<D: Into<Option<Duration>>>(mut self, timeout: D) -> Self {
self.timeout = timeout.into();
self
}
pub fn retry_interval(mut self, interval: Duration) -> Self {
self.retry_interval = interval;
self
}
pub fn backoff_type(mut self, backoff_type: BackoffType) -> Self {
self.backoff_type = backoff_type;
self
}
pub fn allowed_errors(mut self, count: u16) -> Self {
self.allowed_errors = count;
self
}
}
impl Default for WaitOptions {
fn default() -> Self {
Self {
timeout: None,
retry_interval: Duration::from_secs(10),
backoff_type: BackoffType::Linear,
allowed_errors: 2,
}
}
}
pub enum WaitStatus<T, E> {
Finished(T),
Waiting,
FailedTemporarily(E),
FailedPermanently(E),
}
impl<T> From<Error> for WaitStatus<T, Error> {
fn from(error: Error) -> Self {
if error.might_be_temporary() {
WaitStatus::FailedTemporarily(error)
} else {
WaitStatus::FailedPermanently(error)
}
}
}
#[macro_export]
macro_rules! try_wait {
($e:expr) => {
match $e {
Ok(v) => v,
Err(e) => return $crate::wait::WaitStatus::<_, $crate::Error>::from(e),
}
};
}
#[macro_export]
macro_rules! try_with_temporary_failure {
($e:expr) => {
match $e {
Ok(v) => v,
Err(e) => return $crate::wait::WaitStatus::FailedTemporarily(e.into()),
}
};
}
#[macro_export]
macro_rules! try_with_permanent_failure {
($e:expr) => {
match $e {
Ok(v) => v,
Err(e) => return $crate::wait::WaitStatus::FailedPermanently(e.into()),
}
};
}
#[allow(clippy::needless_lifetimes)]
#[instrument(level = "trace", skip(f))]
pub async fn wait<T, E, F, R>(options: &WaitOptions, mut f: F) -> Result<T, E>
where
F: FnMut() -> R,
R: Future<Output = WaitStatus<T, E>>,
E: Display,
Error: Into<E>,
{
let deadline = options.timeout.map(|to| SystemTime::now() + to);
let mut retry_interval = options.retry_interval;
trace!(
"waiting with deadline {:?}, initial interval {:?}",
deadline,
retry_interval
);
let mut errors_seen = 0;
loop {
let fut = f().instrument(trace_span!("wait_attempt", errors_seen));
match fut.await {
WaitStatus::Finished(value) => {
trace!("wait finished successfully");
return Ok(value);
}
WaitStatus::Waiting => trace!("waiting some more"),
WaitStatus::FailedTemporarily(ref e)
if errors_seen < options.allowed_errors =>
{
errors_seen += 1;
error!(
"got error, will retry ({}/{}): {}",
errors_seen, options.allowed_errors, e,
);
}
WaitStatus::FailedTemporarily(err) => {
trace!("too many temporary failures, giving up on wait: {}", err);
return Err(err);
}
WaitStatus::FailedPermanently(err) => {
trace!("permanent failure, giving up on wait: {}", err);
return Err(err);
}
}
if let Some(deadline) = deadline {
let next_attempt = SystemTime::now() + retry_interval;
if next_attempt > deadline {
trace!(
"next attempt {:?} would fall after deadline {:?}, ending wait",
next_attempt,
deadline
);
return Err(Error::Timeout {}.into());
}
}
let duration = max(Duration::from_secs(MIN_SLEEP_SECS), retry_interval);
sleep(duration).await;
match options.backoff_type {
BackoffType::Linear => {}
BackoffType::Exponential => {
retry_interval *= 2;
trace!("next retry doubled to {:?}", retry_interval);
}
}
}
}