use crate::{Error, Result};
use rand::RngExt;
use std::{future::Future, ops::Range, time::Duration};
#[derive(Debug, Clone)]
pub struct RetryOptions {
pub duration_ms: u64,
pub jitter_range_ms: Range<u64>,
pub max_retries: u32,
}
impl Default for RetryOptions {
fn default() -> Self {
Self {
duration_ms: 1000,
jitter_range_ms: 0..30,
max_retries: 3,
}
}
}
#[allow(clippy::too_long_first_doc_paragraph)] pub async fn send_with_retry_opts<A: Future<Output = Result<B>> + Send, B: Send>(
f: impl Fn() -> A + Send,
opts: &RetryOptions,
#[cfg(test)] retry_count: &mut u32,
) -> Result<B> {
let res = f().await;
if let Err(Error::RateLimit {
ratelimit_limit: _,
ratelimit_remaining: _,
ratelimit_reset,
}) = res
{
if opts.max_retries == 0 {
return res;
}
#[cfg(test)]
eprint!("Failed send, trying again...");
let opts = RetryOptions {
duration_ms: opts.duration_ms,
jitter_range_ms: opts.jitter_range_ms.clone(),
max_retries: opts.max_retries.saturating_sub(1),
};
let sleep_millis = ratelimit_reset.map_or(opts.duration_ms, |r| r.saturating_mul(1000));
let jitter = rand::rng().random_range(opts.jitter_range_ms.clone());
std::thread::sleep(Duration::from_millis(sleep_millis + jitter));
#[cfg(test)]
{
*retry_count += 1;
}
Box::pin(send_with_retry_opts(
f,
&opts,
#[cfg(test)]
retry_count,
))
.await
} else {
res
}
}
#[allow(dead_code)]
pub async fn send_with_retry<A: Future<Output = Result<B>> + Send, B: Send>(
f: impl Fn() -> A + Send,
) -> Result<B> {
send_with_retry_opts(
f,
&RetryOptions::default(),
#[cfg(test)]
&mut 0,
)
.await
}
#[macro_export]
macro_rules! retry {
( $f:expr ) => {{
let retry_opts = RetryOptions::default();
send_with_retry_opts(|| $f, &retry_opts).await
}};
}
#[macro_export]
macro_rules! retry_opts {
( $f:expr, $opts:expr ) => {{ send_with_retry_opts(|| $f, &$opts).await }};
}
#[cfg(test)]
#[allow(clippy::needless_return)]
mod test {
use super::{RetryOptions, send_with_retry_opts};
use crate::Error;
#[tokio_shared_rt::test(shared = true)]
#[cfg(not(feature = "blocking"))]
async fn test_retry_count_err() {
let mut run_count = 0u32;
let f = || async {
let err = Error::RateLimit {
ratelimit_limit: Some(10),
ratelimit_remaining: Some(10),
ratelimit_reset: Some(1),
};
Result::<(), Error>::Err(err)
};
let mut opts = RetryOptions::default();
let res = send_with_retry_opts(f, &opts, &mut run_count).await;
assert!(res.is_err());
assert!(run_count == 3);
run_count = 0;
opts.max_retries = 2;
let res = send_with_retry_opts(f, &opts, &mut run_count).await;
assert!(res.is_err());
assert!(run_count == 2);
run_count = 0;
opts.max_retries = 0;
let res = send_with_retry_opts(f, &opts, &mut run_count).await;
assert!(res.is_err());
assert!(run_count == 0);
}
#[tokio_shared_rt::test(shared = true)]
#[cfg(not(feature = "blocking"))]
async fn test_retry_count_ok() {
let mut retry_count = 0u32;
let f = || async { Result::<(), Error>::Ok(()) };
let opts = RetryOptions::default();
let res = send_with_retry_opts(f, &opts, &mut retry_count).await;
assert!(res.is_ok());
assert!(retry_count == 0);
}
}