1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
use std::marker::PhantomData;
use futures::{future::BoxFuture, FutureExt};
use tower::retry::Policy;
#[derive(Debug)]
pub enum RetryPolicy<T, U, F> {
On {
f: F,
times: usize,
},
Never(PhantomData<fn() -> (T, U)>),
}
impl<T, U, F: Clone> Clone for RetryPolicy<T, U, F> {
fn clone(&self) -> Self {
match self {
Self::Never(_) => Self::Never(PhantomData),
Self::On { f, times } => Self::On {
f: f.clone(),
times: *times,
},
}
}
}
impl<T, U, F: Copy> Copy for RetryPolicy<T, U, F> {}
impl<T, U, F> Default for RetryPolicy<T, U, F> {
fn default() -> Self {
Self::never()
}
}
impl<T, U, F> RetryPolicy<T, U, F> {
pub fn never() -> Self {
Self::Never(PhantomData)
}
pub fn retry_on<E, F2>(self, f: F2) -> RetryPolicy<T, U, F2>
where
F2: Fn(&E) -> bool,
F2: Send + 'static + Clone,
{
RetryPolicy::On { f, times: 0 }
}
}
impl<T, U, E, F> Policy<T, U, E> for RetryPolicy<T, U, F>
where
T: 'static + Clone,
U: 'static,
F: Fn(&E) -> bool,
F: Send + 'static + Clone,
{
type Future = BoxFuture<'static, Self>;
fn retry(&self, _req: &T, result: Result<&U, &E>) -> Option<Self::Future> {
match self {
Self::On { f, times } => match result {
Ok(_) => None,
Err(err) => {
if f(err) {
let times = *times;
let secs = (1 << times).min(128);
tracing::trace!("retry in {secs}s;");
let retry = Self::On {
f: f.clone(),
times: times + 1,
};
let fut = async move {
tokio::time::sleep(std::time::Duration::from_secs(secs)).await;
retry
}
.boxed();
Some(fut)
} else {
tracing::trace!("retry given up;");
None
}
}
},
Self::Never(_) => None,
}
}
fn clone_request(&self, req: &T) -> Option<T> {
Some(req.clone())
}
}