Skip to main content

tokio_retry/
lib.rs

1//! This library provides extensible asynchronous retry behaviours
2//! for use with the ecosystem of [`tokio`](https://tokio.rs/) libraries.
3//!
4//! # Example
5//!
6//! ```rust,no_run
7//! use tokio_retry::Retry;
8//! use tokio_retry::strategy::{ExponentialBackoff, jitter};
9//!
10//! async fn action() -> Result<u64, ()> {
11//!     // do some real-world stuff here...
12//!     Err(())
13//! }
14//!
15//! # #[tokio::main]
16//! # async fn main() -> Result<(), ()> {
17//! let retry_strategy = ExponentialBackoff::from_millis(10)
18//!     .map(jitter) // add jitter to delays
19//!     .take(3);    // limit to 3 retries
20//!
21//! let result = Retry::start(retry_strategy, action).await?;
22//! # Ok(())
23//! # }
24//! ```
25
26#![no_std]
27
28use core::future::Future;
29use core::iter::{IntoIterator, Iterator};
30use core::pin::Pin;
31use core::task::{Context, Poll};
32
33use pin_project_lite::pin_project;
34use tokio::time::{sleep_until, Duration, Instant, Sleep};
35
36/// Assorted retry strategies including fixed interval and exponential back-off.
37pub mod strategy;
38
39pin_project! {
40    #[project = RetryStateProj]
41    enum RetryState<A>
42    where
43        A: Action,
44    {
45        Running {
46            #[pin]
47            future: A::Future,
48        },
49        Sleeping {
50            #[pin]
51            future: Sleep,
52        },
53    }
54}
55
56impl<A: Action> RetryState<A> {
57    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> RetryFuturePoll<A> {
58        match self.project() {
59            RetryStateProj::Running { future } => RetryFuturePoll::Running(future.poll(cx)),
60            RetryStateProj::Sleeping { future } => RetryFuturePoll::Sleeping(future.poll(cx)),
61        }
62    }
63}
64
65enum RetryFuturePoll<A>
66where
67    A: Action,
68{
69    Running(Poll<Result<A::Item, A::Error>>),
70    Sleeping(Poll<()>),
71}
72
73pin_project! {
74    /// Future that drives multiple attempts at an action via a retry strategy.
75    pub struct Retry<I, A>
76    where
77        I: Iterator<Item = Duration>,
78        A: Action,
79    {
80        #[pin]
81        retry_if: RetryIf<I, A, fn(&A::Error) -> bool>,
82    }
83}
84
85impl<I, A> Retry<I, A>
86where
87    I: Iterator<Item = Duration>,
88    A: Action,
89{
90    pub fn start<T: IntoIterator<IntoIter = I, Item = Duration>>(strategy: T, action: A) -> Self {
91        Self {
92            retry_if: RetryIf::start(strategy, action, (|_| true) as fn(&A::Error) -> bool),
93        }
94    }
95
96    #[deprecated(since = "0.3.2", note = "renamed to `start()`")]
97    pub fn spawn<T: IntoIterator<IntoIter = I, Item = Duration>>(strategy: T, action: A) -> Self {
98        Self::start(strategy, action)
99    }
100}
101
102impl<I, A> Future for Retry<I, A>
103where
104    I: Iterator<Item = Duration>,
105    A: Action,
106{
107    type Output = Result<A::Item, A::Error>;
108
109    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
110        let this = self.project();
111        this.retry_if.poll(cx)
112    }
113}
114
115pin_project! {
116    /// Future that drives multiple attempts at an action via a retry strategy. Retries are only attempted if
117    /// the `Error` returned by the future satisfies a given condition.
118    pub struct RetryIf<I, A, C>
119    where
120        I: Iterator<Item = Duration>,
121        A: Action,
122        C: Condition<A::Error>,
123    {
124        strategy: I,
125        #[pin]
126        state: RetryState<A>,
127        action: A,
128        condition: C,
129    }
130}
131
132impl<I, A, C> RetryIf<I, A, C>
133where
134    I: Iterator<Item = Duration>,
135    A: Action,
136    C: Condition<A::Error>,
137{
138    pub fn start<T: IntoIterator<IntoIter = I, Item = Duration>>(
139        strategy: T,
140        mut action: A,
141        condition: C,
142    ) -> Self {
143        Self {
144            strategy: strategy.into_iter(),
145            state: RetryState::Running {
146                future: action.run(),
147            },
148            action,
149            condition,
150        }
151    }
152
153    #[deprecated(since = "0.3.2", note = "renamed to `start()`")]
154    pub fn spawn<T: IntoIterator<IntoIter = I, Item = Duration>>(
155        strategy: T,
156        action: A,
157        condition: C,
158    ) -> Self {
159        Self::start(strategy, action, condition)
160    }
161
162    fn attempt(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<A::Item, A::Error>> {
163        let future = {
164            let this = self.as_mut().project();
165            this.action.run()
166        };
167        self.as_mut()
168            .project()
169            .state
170            .set(RetryState::Running { future });
171        self.poll(cx)
172    }
173
174    #[allow(clippy::type_complexity)]
175    fn retry(
176        mut self: Pin<&mut Self>,
177        err: A::Error,
178        cx: &mut Context<'_>,
179    ) -> Result<Poll<Result<A::Item, A::Error>>, A::Error> {
180        match self.as_mut().project().strategy.next() {
181            None => Err(err),
182            Some(duration) => {
183                let deadline = Instant::now() + duration;
184                let future = sleep_until(deadline);
185                self.as_mut()
186                    .project()
187                    .state
188                    .set(RetryState::Sleeping { future });
189                Ok(self.poll(cx))
190            }
191        }
192    }
193}
194
195impl<I, A, C> Future for RetryIf<I, A, C>
196where
197    I: Iterator<Item = Duration>,
198    A: Action,
199    C: Condition<A::Error>,
200{
201    type Output = Result<A::Item, A::Error>;
202
203    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
204        match self.as_mut().project().state.poll(cx) {
205            RetryFuturePoll::Running(poll_result) => match poll_result {
206                Poll::Ready(Ok(ok)) => Poll::Ready(Ok(ok)),
207                Poll::Pending => Poll::Pending,
208                Poll::Ready(Err(err)) => {
209                    if self.as_mut().project().condition.should_retry(&err) {
210                        match self.retry(err, cx) {
211                            Ok(poll) => poll,
212                            Err(err) => Poll::Ready(Err(err)),
213                        }
214                    } else {
215                        Poll::Ready(Err(err))
216                    }
217                }
218            },
219            RetryFuturePoll::Sleeping(poll_result) => match poll_result {
220                Poll::Pending => Poll::Pending,
221                Poll::Ready(_) => self.attempt(cx),
222            },
223        }
224    }
225}
226
227/// An action can be run multiple times and produces a future.
228pub trait Action {
229    /// The future that this action produces.
230    type Future: Future<Output = Result<Self::Item, Self::Error>>;
231    /// The item that the future may resolve with.
232    type Item;
233    /// The error that the future may resolve with.
234    type Error;
235
236    fn run(&mut self) -> Self::Future;
237}
238
239impl<R, E, T: Future<Output = Result<R, E>>, F: FnMut() -> T> Action for F {
240    type Item = R;
241    type Error = E;
242    type Future = T;
243
244    fn run(&mut self) -> Self::Future {
245        self()
246    }
247}
248
249/// Specifies under which conditions a retry is attempted.
250pub trait Condition<E> {
251    fn should_retry(&mut self, error: &E) -> bool;
252}
253
254impl<E, F: FnMut(&E) -> bool> Condition<E> for F {
255    fn should_retry(&mut self, error: &E) -> bool {
256        self(error)
257    }
258}