use std::time::Duration;
pub mod s3;
pub mod sftp;
pub async fn retry_with_backoff<F, Fut, T, E>(
max_attempts: u32,
initial_delay: Duration,
f: F,
) -> Result<T, E>
where
F: Fn() -> Fut,
Fut: Future<Output = Result<T, E>>,
E: std::fmt::Debug,
{
let mut current_attempt = 0;
loop {
let result = f().await;
let delay = initial_delay * (1 << current_attempt);
current_attempt += 1;
if let Err(error) = result.as_ref()
&& current_attempt < max_attempts
{
tracing::warn!(
message = "Operation failed. Retrying...",
attempt = current_attempt,
max_attempts = max_attempts,
?error,
?delay
);
tokio::time::sleep(delay).await;
} else {
return result;
}
}
}
#[cfg(any(feature = "auth", test))]
const fn cumulative_backoff_duration(initial_delay: Duration, attempts: u32) -> Duration {
initial_delay.checked_mul((1 << attempts) - 1).unwrap()
}
#[cfg(test)]
mod tests {
use std::time::Duration;
#[test]
#[cfg(feature = "auth")]
fn max_backoff_duration() {
assert_eq!(
super::cumulative_backoff_duration(Duration::from_secs(8), 5),
Duration::from_secs(8 + 16 + 32 + 64 + 128)
);
assert_eq!(
super::cumulative_backoff_duration(Duration::from_secs(5), 20),
Duration::from_secs((0..20).map(|i| 5 * (1 << i)).sum())
)
}
#[tokio::test]
async fn retry_with_backoff_success() {
use std::sync::{Arc, Mutex};
use tokio::time::Instant;
let call_count = Arc::new(Mutex::new(0));
const SUCCESSFUL_ATTEMPT: u32 = 3;
let mock_op = || {
let call_count = call_count.clone();
async move {
let mut count = call_count.lock().unwrap();
*count += 1;
if *count >= SUCCESSFUL_ATTEMPT {
Ok("Success")
} else {
Err("Failure")
}
}
};
const MAX_ATTEMPTS: u32 = 5;
const INITIAL_DELAY: Duration = Duration::from_millis(10);
let start = Instant::now();
let result = super::retry_with_backoff(MAX_ATTEMPTS, INITIAL_DELAY, mock_op).await;
let duration = start.elapsed();
assert_eq!(result, Ok("Success"));
assert_eq!(*call_count.lock().unwrap(), SUCCESSFUL_ATTEMPT);
assert!(
duration >= super::cumulative_backoff_duration(INITIAL_DELAY, SUCCESSFUL_ATTEMPT - 1)
);
}
#[tokio::test]
async fn retry_with_backoff_failure() {
use std::sync::{Arc, Mutex};
use tokio::time::Instant;
let call_count = Arc::new(Mutex::new(0));
let mock_op = || {
let call_count = call_count.clone();
async move {
let mut count = call_count.lock().unwrap();
*count += 1;
Err("Failure")
}
};
const MAX_ATTEMPTS: u32 = 3;
const INITIAL_DELAY: Duration = Duration::from_millis(10);
let start = Instant::now();
let result: Result<&str, &str> =
super::retry_with_backoff(MAX_ATTEMPTS, INITIAL_DELAY, mock_op).await;
let duration = start.elapsed();
assert_eq!(result, Err("Failure"));
assert_eq!(*call_count.lock().unwrap(), 3);
assert!(duration >= super::cumulative_backoff_duration(INITIAL_DELAY, MAX_ATTEMPTS - 1));
}
}