1#![no_std]
27
28use core::future::Future;
29use core::iter::{IntoIterator, Iterator};
30use core::pin::Pin;
31use core::task::{Context, Poll};
32
33use pin_project_lite::pin_project;
34use tokio::time::{sleep_until, Duration, Instant, Sleep};
35
36pub mod strategy;
38
39pin_project! {
40 #[project = RetryStateProj]
41 enum RetryState<A>
42 where
43 A: Action,
44 {
45 Running {
46 #[pin]
47 future: A::Future,
48 },
49 Sleeping {
50 #[pin]
51 future: Sleep,
52 },
53 }
54}
55
56impl<A: Action> RetryState<A> {
57 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> RetryFuturePoll<A> {
58 match self.project() {
59 RetryStateProj::Running { future } => RetryFuturePoll::Running(future.poll(cx)),
60 RetryStateProj::Sleeping { future } => RetryFuturePoll::Sleeping(future.poll(cx)),
61 }
62 }
63}
64
65enum RetryFuturePoll<A>
66where
67 A: Action,
68{
69 Running(Poll<Result<A::Item, A::Error>>),
70 Sleeping(Poll<()>),
71}
72
73pin_project! {
74 pub struct Retry<I, A>
76 where
77 I: Iterator<Item = Duration>,
78 A: Action,
79 {
80 #[pin]
81 retry_if: RetryIf<I, A, fn(&A::Error) -> bool>,
82 }
83}
84
85impl<I, A> Retry<I, A>
86where
87 I: Iterator<Item = Duration>,
88 A: Action,
89{
90 pub fn start<T: IntoIterator<IntoIter = I, Item = Duration>>(strategy: T, action: A) -> Self {
91 Self {
92 retry_if: RetryIf::start(strategy, action, (|_| true) as fn(&A::Error) -> bool),
93 }
94 }
95
96 #[deprecated(since = "0.3.2", note = "renamed to `start()`")]
97 pub fn spawn<T: IntoIterator<IntoIter = I, Item = Duration>>(strategy: T, action: A) -> Self {
98 Self::start(strategy, action)
99 }
100}
101
102impl<I, A> Future for Retry<I, A>
103where
104 I: Iterator<Item = Duration>,
105 A: Action,
106{
107 type Output = Result<A::Item, A::Error>;
108
109 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
110 let this = self.project();
111 this.retry_if.poll(cx)
112 }
113}
114
115pin_project! {
116 pub struct RetryIf<I, A, C>
119 where
120 I: Iterator<Item = Duration>,
121 A: Action,
122 C: Condition<A::Error>,
123 {
124 strategy: I,
125 #[pin]
126 state: RetryState<A>,
127 action: A,
128 condition: C,
129 }
130}
131
132impl<I, A, C> RetryIf<I, A, C>
133where
134 I: Iterator<Item = Duration>,
135 A: Action,
136 C: Condition<A::Error>,
137{
138 pub fn start<T: IntoIterator<IntoIter = I, Item = Duration>>(
139 strategy: T,
140 mut action: A,
141 condition: C,
142 ) -> Self {
143 Self {
144 strategy: strategy.into_iter(),
145 state: RetryState::Running {
146 future: action.run(),
147 },
148 action,
149 condition,
150 }
151 }
152
153 #[deprecated(since = "0.3.2", note = "renamed to `start()`")]
154 pub fn spawn<T: IntoIterator<IntoIter = I, Item = Duration>>(
155 strategy: T,
156 action: A,
157 condition: C,
158 ) -> Self {
159 Self::start(strategy, action, condition)
160 }
161
162 fn attempt(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<A::Item, A::Error>> {
163 let future = {
164 let this = self.as_mut().project();
165 this.action.run()
166 };
167 self.as_mut()
168 .project()
169 .state
170 .set(RetryState::Running { future });
171 self.poll(cx)
172 }
173
174 #[allow(clippy::type_complexity)]
175 fn retry(
176 mut self: Pin<&mut Self>,
177 err: A::Error,
178 cx: &mut Context<'_>,
179 ) -> Result<Poll<Result<A::Item, A::Error>>, A::Error> {
180 match self.as_mut().project().strategy.next() {
181 None => Err(err),
182 Some(duration) => {
183 let deadline = Instant::now() + duration;
184 let future = sleep_until(deadline);
185 self.as_mut()
186 .project()
187 .state
188 .set(RetryState::Sleeping { future });
189 Ok(self.poll(cx))
190 }
191 }
192 }
193}
194
195impl<I, A, C> Future for RetryIf<I, A, C>
196where
197 I: Iterator<Item = Duration>,
198 A: Action,
199 C: Condition<A::Error>,
200{
201 type Output = Result<A::Item, A::Error>;
202
203 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
204 match self.as_mut().project().state.poll(cx) {
205 RetryFuturePoll::Running(poll_result) => match poll_result {
206 Poll::Ready(Ok(ok)) => Poll::Ready(Ok(ok)),
207 Poll::Pending => Poll::Pending,
208 Poll::Ready(Err(err)) => {
209 if self.as_mut().project().condition.should_retry(&err) {
210 match self.retry(err, cx) {
211 Ok(poll) => poll,
212 Err(err) => Poll::Ready(Err(err)),
213 }
214 } else {
215 Poll::Ready(Err(err))
216 }
217 }
218 },
219 RetryFuturePoll::Sleeping(poll_result) => match poll_result {
220 Poll::Pending => Poll::Pending,
221 Poll::Ready(_) => self.attempt(cx),
222 },
223 }
224 }
225}
226
227pub trait Action {
229 type Future: Future<Output = Result<Self::Item, Self::Error>>;
231 type Item;
233 type Error;
235
236 fn run(&mut self) -> Self::Future;
237}
238
239impl<R, E, T: Future<Output = Result<R, E>>, F: FnMut() -> T> Action for F {
240 type Item = R;
241 type Error = E;
242 type Future = T;
243
244 fn run(&mut self) -> Self::Future {
245 self()
246 }
247}
248
249pub trait Condition<E> {
251 fn should_retry(&mut self, error: &E) -> bool;
252}
253
254impl<E, F: FnMut(&E) -> bool> Condition<E> for F {
255 fn should_retry(&mut self, error: &E) -> bool {
256 self(error)
257 }
258}