1#![allow(clippy::type_repetition_in_bounds)]
51
52use backoff::backoff::Backoff;
53use backoff::Error;
54use std::future::Future;
55use std::time::Duration;
56
57struct BackoffFutureBuilder<'b, B, F, Fut, T, E>
58where
59 B: Backoff,
60 F: FnMut() -> Fut,
61 Fut: Future<Output = Result<T, Error<E>>>,
62{
63 backoff: &'b mut B,
64 f: F,
65}
66
67impl<'b, B, F, Fut, T, E> BackoffFutureBuilder<'b, B, F, Fut, T, E>
68where
69 B: Backoff,
70 F: FnMut() -> Fut,
71 Fut: Future<Output = Result<T, Error<E>>>,
72{
73 async fn fut<N: FnMut(&Error<E>, Duration)>(mut self, mut notify: N) -> Result<T, Error<E>> {
74 loop {
75 let work_result = (self.f)().await;
76 match work_result {
77 Ok(_) | Err(Error::Permanent(_)) => return work_result,
78 Err(err @ Error::Transient(_)) => {
79 if let Some(backoff_duration) = self.backoff.next_backoff() {
80 notify(&err, backoff_duration);
81 tokio::time::delay_for(backoff_duration).await
82 } else {
83 return Err(err);
84 }
85 }
86 }
87 }
88 }
89}
90
91#[async_trait::async_trait(?Send)]
92pub trait BackoffExt<T, E, Fut, F> {
93 #[deprecated(since = "0.3.1", note = "Use the built-in `backoff::future` async support")]
94 async fn with_backoff<B>(self, backoff: &mut B) -> Result<T, Error<E>>
107 where
108 B: Backoff,
109 T: 'async_trait,
110 E: 'async_trait,
111 Fut: 'async_trait;
112
113 #[deprecated(since = "0.3.1", note = "Use the built-in `backoff::future` async support")]
114 async fn with_backoff_notify<B, N>(self, backoff: &mut B, notify: N) -> Result<T, Error<E>>
117 where
118 B: Backoff,
119 N: FnMut(&Error<E>, Duration),
120 T: 'async_trait,
121 E: 'async_trait,
122 Fut: 'async_trait;
123}
124
125#[async_trait::async_trait(?Send)]
126impl<T, E, Fut, F> BackoffExt<T, E, Fut, F> for F
127 where
128 F: FnMut() -> Fut,
129 Fut: Future<Output = Result<T, backoff::Error<E>>> {
130
131 async fn with_backoff<B>(self, backoff: &mut B) -> Result<T, Error<E>>
132 where
133 B: Backoff,
134 T: 'async_trait,
135 E: 'async_trait,
136 Fut: 'async_trait
137 {
138 let backoff_struct = BackoffFutureBuilder { backoff, f: self };
139 backoff_struct.fut(|_, _| {}).await
140 }
141
142 async fn with_backoff_notify<B, N>(self, backoff: &mut B, notify: N) -> Result<T, Error<E>>
143 where
144 B: Backoff,
145 N: FnMut(&Error<E>, Duration),
146 T: 'async_trait,
147 E: 'async_trait,
148 Fut: 'async_trait
149 {
150 let backoff_struct = BackoffFutureBuilder { backoff, f: self };
151 backoff_struct.fut(notify).await
152 }
153}
154
155#[cfg(test)]
156mod tests {
157 use super::BackoffExt;
158 use futures::Future;
159
160 #[test]
161 fn test_when_future_succeeds() {
162 fn do_work() -> impl Future<Output = Result<u32, backoff::Error<()>>> {
163 futures::future::ready(Ok(123))
164 }
165
166 let mut backoff = backoff::ExponentialBackoff::default();
167 let result: Result<u32, backoff::Error<()>> =
168 futures::executor::block_on(do_work.with_backoff(&mut backoff));
169 assert_eq!(result.ok(), Some(123));
170 }
171
172 #[test]
173 fn test_with_closure_when_future_succeeds() {
174 let do_work = || {
175 futures::future::lazy(|_| Ok(123))
176 };
177
178 let mut backoff = backoff::ExponentialBackoff::default();
179 let result: Result<u32, backoff::Error<()>> =
180 futures::executor::block_on(do_work.with_backoff(&mut backoff));
181 assert_eq!(result.ok(), Some(123));
182 }
183
184 #[test]
185 fn test_with_closure_when_future_fails_with_permanent_error() {
186 use matches::assert_matches;
187
188 let do_work = || {
189 let result = Err(backoff::Error::Permanent(()));
190 futures::future::ready(result)
191 };
192
193 let mut backoff = backoff::ExponentialBackoff::default();
194 let result: Result<u32, backoff::Error<()>> =
195 futures::executor::block_on(do_work.with_backoff(&mut backoff));
196 assert_matches!(result.err(), Some(backoff::Error::Permanent(_)));
197 }
198
199 #[test]
200 fn test_with_async_fn_when_future_succeeds() {
201 async fn do_work() -> Result<u32, backoff::Error<()>> {
202 Ok(123)
203 }
204
205 let mut backoff = backoff::ExponentialBackoff::default();
206 let result: Result<u32, backoff::Error<()>> =
207 futures::executor::block_on(do_work.with_backoff(&mut backoff));
208 assert_eq!(result.ok(), Some(123));
209 }
210
211 #[test]
212 fn test_with_async_fn_when_future_fails_for_some_time() {
213 static mut CALL_COUNTER: usize = 0;
214 const CALLS_TO_SUCCESS: usize = 5;
215
216 use std::time::Duration;
217
218 async fn do_work() -> Result<u32, backoff::Error<()>> {
219 unsafe {
220 CALL_COUNTER += 1;
221 if CALL_COUNTER == CALLS_TO_SUCCESS {
222 Ok(123)
223 } else {
224 Err(backoff::Error::Transient(()))
225 }
226 }
227 };
228
229 let mut backoff = backoff::ExponentialBackoff::default();
230 backoff.current_interval = Duration::from_millis(1);
231 backoff.initial_interval = Duration::from_millis(1);
232
233 let mut notify_counter = 0;
234
235 let mut runtime = tokio::runtime::Runtime::new()
236 .expect("tokio runtime creation");
237
238 let result = runtime.block_on(do_work.with_backoff_notify(&mut backoff, |e, d| {
239 notify_counter += 1;
240 println!("Error {:?}, waiting for: {}", e, d.as_millis());
241 }));
242
243 unsafe {
244 assert_eq!(CALL_COUNTER, CALLS_TO_SUCCESS);
245 }
246 assert_eq!(CALLS_TO_SUCCESS, notify_counter + 1);
247 assert_eq!(result.ok(), Some(123));
248 }
249}