1use std::{
2 future::Future,
3 pin::Pin,
4 task::{Context, Poll},
5 time::Duration,
6};
7
8use futures_core::ready;
9use pin_project_lite::pin_project;
10
11use crate::{backoff::Backoff, error::Error};
12
13use crate::retry::{NoopNotify, Notify};
14
15pub trait Sleeper {
16 type Sleep: Future<Output = ()> + Send + 'static;
17 fn sleep(&self, dur: Duration) -> Self::Sleep;
18}
19
20#[cfg(any(feature = "tokio", feature = "async-std"))]
42pub fn retry<I, E, Fn, Fut, B>(
43 backoff: B,
44 operation: Fn,
45) -> Retry<impl Sleeper, B, NoopNotify, Fn, Fut>
46where
47 B: Backoff,
48 Fn: FnMut() -> Fut,
49 Fut: Future<Output = Result<I, Error<E>>>,
50{
51 retry_notify(backoff, operation, NoopNotify)
52}
53
54#[cfg(any(feature = "tokio", feature = "async-std"))]
93pub fn retry_notify<I, E, Fn, Fut, B, N>(
94 mut backoff: B,
95 operation: Fn,
96 notify: N,
97) -> Retry<impl Sleeper, B, N, Fn, Fut>
98where
99 B: Backoff,
100 Fn: FnMut() -> Fut,
101 Fut: Future<Output = Result<I, Error<E>>>,
102 N: Notify<E>,
103{
104 backoff.reset();
105 Retry::new(rt_sleeper(), backoff, notify, operation)
106}
107
108pin_project! {
109 pub struct Retry<S: Sleeper, B, N, Fn, Fut> {
111 sleeper: S,
113
114 backoff: B,
116
117 #[pin]
119 delay: OptionPinned<S::Sleep>,
120
121 operation: Fn,
123
124 #[pin]
126 fut: Fut,
127
128 notify: N,
130 }
131}
132
133impl<S, B, N, Fn, Fut, I, E> Retry<S, B, N, Fn, Fut>
134where
135 S: Sleeper,
136 Fn: FnMut() -> Fut,
137 Fut: Future<Output = Result<I, Error<E>>>,
138{
139 pub fn new(sleeper: S, backoff: B, notify: N, mut operation: Fn) -> Self {
140 let fut = operation();
141 Retry {
142 sleeper,
143 backoff,
144 delay: OptionPinned::None,
145 operation,
146 fut,
147 notify,
148 }
149 }
150}
151
152pin_project! {
153 #[project = OptionProj]
154 enum OptionPinned<T> {
155 Some {
156 #[pin]
157 inner: T,
158 },
159 None,
160 }
161}
162
163impl<S, B, N, Fn, Fut, I, E> Future for Retry<S, B, N, Fn, Fut>
164where
165 S: Sleeper,
166 B: Backoff,
167 N: Notify<E>,
168 Fn: FnMut() -> Fut,
169 Fut: Future<Output = Result<I, Error<E>>>,
170{
171 type Output = Result<I, E>;
172
173 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
174 let mut this = self.project();
175
176 loop {
177 if let OptionProj::Some { inner: delay } = this.delay.as_mut().project() {
178 ready!(delay.poll(cx));
179 this.delay.set(OptionPinned::None);
180 }
181
182 match ready!(this.fut.as_mut().poll(cx)) {
183 Ok(v) => return Poll::Ready(Ok(v)),
184 Err(Error::Permanent(e)) => return Poll::Ready(Err(e)),
185 Err(Error::Transient { err, retry_after }) => {
186 match retry_after.or_else(|| this.backoff.next_backoff()) {
187 Some(duration) => {
188 this.notify.notify(err, duration);
189 this.delay.set(OptionPinned::Some {
190 inner: this.sleeper.sleep(duration),
191 });
192 this.fut.set((this.operation)());
193 }
194 None => return Poll::Ready(Err(err)),
195 }
196 }
197 }
198 }
199 }
200}
201
202#[cfg(all(feature = "tokio", feature = "async-std"))]
203compile_error!("Feature \"tokio\" and \"async-std\" cannot be enabled at the same time");
204
205#[cfg(feature = "async-std")]
206fn rt_sleeper() -> impl Sleeper {
207 AsyncStdSleeper
208}
209
210#[cfg(feature = "tokio")]
211fn rt_sleeper() -> impl Sleeper {
212 TokioSleeper
213}
214
215#[cfg(feature = "tokio")]
216#[cfg_attr(docsrs, doc(cfg(feature = "tokio")))]
217struct TokioSleeper;
218#[cfg(feature = "tokio")]
219#[cfg_attr(docsrs, doc(cfg(feature = "tokio")))]
220impl Sleeper for TokioSleeper {
221 type Sleep = ::tokio_1::time::Sleep;
222 fn sleep(&self, dur: Duration) -> Self::Sleep {
223 ::tokio_1::time::sleep(dur)
224 }
225}
226
227#[cfg(feature = "async-std")]
228#[cfg_attr(docsrs, doc(cfg(feature = "async-std")))]
229struct AsyncStdSleeper;
230
231#[cfg(feature = "async-std")]
232#[cfg_attr(docsrs, doc(cfg(feature = "async-std")))]
233impl Sleeper for AsyncStdSleeper {
234 type Sleep = Pin<Box<dyn Future<Output = ()> + Send + 'static>>;
235 fn sleep(&self, dur: Duration) -> Self::Sleep {
236 Box::pin(::async_std_1::task::sleep(dur))
237 }
238}