1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
use std::sync::Arc;
use backoff::{backoff::Backoff, ExponentialBackoff};
use tokio::sync::Mutex;
#[derive(Clone)]
pub struct Retry(pub Arc<Mutex<dyn Backoff + Send + Sync>>);
impl Default for Retry {
fn default() -> Retry {
Self(Arc::new(Mutex::new(ExponentialBackoff::default())))
}
}
#[macro_export]
macro_rules! backoff_retry {
($cancel:expr, $retry:expr, $attempt:block, $exhausted:block) => {
{
let mut _final_result = Ok(());
let mut _attempt = 0u32;
loop {
let _result = async {
$attempt
Ok::<(), backoff::Error<$crate::Error>>(())
}.await;
match _result {
Ok(()) => {
if _attempt > 0 {
tracing::info!(attempt = _attempt, "retry successful");
}
_final_result = Ok(());
break;
}
Err(backoff::Error::Permanent(err)) => {
_final_result = Err(err);
break;
}
Err(backoff::Error::Transient { err, retry_after }) => {
let mut _retry = $retry.0.lock().await;
match retry_after.or(_retry.next_backoff()) {
Some(delay) => {
_attempt += 1;
tokio::select! {
_ = $cancel.cancelled() => {
_final_result = Err($crate::CancelError.into());
break;
}
_ = tokio::time::sleep(delay) => {}
};
tracing::warn!(?delay, attempt = _attempt, "retrying");
}
None => {
$exhausted
_final_result = Err(err);
_retry.reset();
break;
}
}
}
}
}
_final_result
}
};
($cancel:expr, $retry:expr, $attempt:block) => {
backoff_retry!($cancel, $retry, $attempt, {})
}
}