1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
macro_rules! retry_impl {
($time:expr) => {
use crate::{RetryErr, RetryOp, RetryResult};
use std::{future::Future, time::Duration};
/// Retry a future based on an iterator over Duration. A timer will be run for
/// each item in the iterator.
///
/// ```rust,no_run
/// # use std::{io, sync::{Arc, Mutex}};
/// use retry_fn::strategy::Constant;
/// use retry_fn::RetryResult;
/// # use retry_fn::tokio::retry;
/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
/// # tokio::task::spawn_blocking(|| async move {
/// let count: Arc<Mutex<i32>> = Arc::new(Mutex::new(0));
/// let res = retry(Constant::from_millis(100), |op| {
/// let count = count.clone();
/// async move {
/// if op.retries >= 3 {
/// RetryResult::<&str, _>::Err(io::Error::new(
/// io::ErrorKind::TimedOut,
/// "timed out",
/// ))
/// } else {
/// *count.lock().unwrap() += 1;
/// RetryResult::Retry()
/// }
/// }
/// })
/// .await;
/// assert_eq!(*count.lock().unwrap(), 3);
/// assert!(res.is_err());
/// # });
/// # Ok(())
/// # }
/// ```
///
/// # Returns
/// If successful, return `Ok`, otherwise return `Retry` to try again or `Err`
/// to exit with an error
pub async fn retry<I, F, Fut, T, E>(iter: I, mut f: F) -> Result<T, RetryErr<E>>
where
I: IntoIterator<Item = Duration>,
F: FnMut(RetryOp) -> Fut,
Fut: Future<Output = RetryResult<T, E>>,
{
let mut count = 0;
let mut total_delay = Duration::from_millis(0);
for dur in iter.into_iter() {
match f(RetryOp {
retries: count,
total_delay,
})
.await
{
RetryResult::Retry() => {
$time(dur).await;
total_delay += dur;
count += 1;
}
RetryResult::Err(err) => {
return Err(RetryErr::FailedAttempt {
tries: count,
total_delay,
err,
});
}
RetryResult::Ok(val) => {
return Ok(val);
}
}
}
Err(RetryErr::IteratorEnded {
tries: count,
total_delay,
})
}
/// Retry a future based on an iterator over Duration. A timer will be run for
/// each item in the iterator.
/// This takes a future that must implement `Unpin`, so it can be repeatedly
/// called in the loop
///
/// ```rust,no_run
/// # use std::{io, sync::{Arc, Mutex}};
/// use retry_fn::strategy::Constant;
/// use retry_fn::RetryResult;
/// # use retry_fn::tokio::retry_unpin;
/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
/// # tokio::task::spawn_blocking(|| async move {
/// let count: Arc<Mutex<i32>> = Arc::new(Mutex::new(0));
/// let fut = {
/// let count = count.clone();
/// async move {
/// if *count.lock().unwrap() >= 3 {
/// RetryResult::<&str, _>::Err(io::Error::new(
/// io::ErrorKind::TimedOut,
/// "timed out",
/// ))
/// } else {
/// *count.lock().unwrap() += 1;
/// RetryResult::Retry()
/// }
/// }
/// };
/// tokio::pin!(fut);
/// let res = retry_unpin(Constant::from_millis(100), fut)
/// .await;
/// assert_eq!(*count.lock().unwrap(), 3);
/// assert!(res.is_err());
/// # });
/// # Ok(())
/// # }
/// ```
///
/// # Returns
/// If successful, return `Ok`, otherwise return `Retry` to try again or `Err`
/// to exit with an error
pub async fn retry_unpin<I, Fut, T, E>(iter: I, mut f: Fut) -> Result<T, RetryErr<E>>
where
I: IntoIterator<Item = Duration>,
Fut: Future<Output = RetryResult<T, E>> + Unpin,
{
let mut count = 0;
let mut total_delay = Duration::from_millis(0);
for dur in iter.into_iter() {
match (&mut f).await {
RetryResult::Retry() => {
tokio::time::sleep(dur).await;
total_delay += dur;
count += 1;
}
RetryResult::Err(err) => {
return Err(RetryErr::FailedAttempt {
tries: count,
total_delay,
err,
});
}
RetryResult::Ok(val) => {
return Ok(val);
}
}
}
Err(RetryErr::IteratorEnded {
tries: count,
total_delay,
})
}
};
}