restartables/lib.rs
1//! Say, for example, that you want to keep pinging a URL until it returns 200, or five seconds pass.
2//! And if the URL _does_ return 200, you'd like to know how long that took.
3//!
4//! This library contains a Future wrapper named [`Restartable`](https://docs.rs/restartables/0.4.1/restartables/struct.Restartable.html). It wraps up a Future you want to retry, and it keeps retrying
5//! the future until it passes a Test you provide. If the inner future passes the Test, then the wrapper
6//! resolves your value. But if the inner future fails the Test, the wrapper will just restart the future.
7//! Assuming the timeout hasn't expired.
8//!
9//! To do this, you need to provide three things when instantiating the wrapper:
10//! - A future to poll
11//! - A test, i.e. a closure which takes values from the inner future, runs a test on it, and returns Result
12//! - A factory to make new futures if the previous future resolved a value that failed the test.
13//!
14//! The wrapper will also return some metrics, i.e. how much time elapsed before the future resolved, and
15//! how many restarts were necessary.
16//!
17//! # Example
18//!
19//! ```
20//! use restartables::{Failure, Restartable};
21//! use std::future::Future;
22//! use std::pin::Pin;
23//! use std::task::{Context, Poll};
24//! use std::time::Duration;
25//!
26//! // A Future that yields a random u16 when it resolves.
27//! struct RandomNum {}
28//! impl Future for RandomNum {
29//! type Output = u16;
30//! fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
31//! cx.waker().wake_by_ref();
32//! Poll::Ready(rand::random())
33//! }
34//! }
35//!
36//! async fn print_random_even_number() {
37//! // This closure produces futures that the Restartable will poll
38//! let factory = || RandomNum {};
39//!
40//! // This test returns even numbers, and fails odd numbers.
41//! let test_is_even = |num| {
42//! if num % 2 == 0 {
43//! Ok(num)
44//! } else {
45//! Err("number wasn't even")
46//! }
47//! };
48//!
49//! // Wrap the inner `RandomNum` future into a `Restartable` future.
50//! let retrying = Restartable::new(
51//! factory,
52//! Some(Duration::from_millis(1)),
53//! test_is_even,
54//! );
55//!
56//! match retrying.await {
57//! Ok(success) => println!(
58//! "Final number was {}, which took {}us and {} restarts to get",
59//! success.value,
60//! success.duration.as_micros(),
61//! success.restarts
62//! ),
63//! Err(Failure::Timeout) => println!("Never found an even number :("),
64//! Err(Failure::Err { error, restarts }) => {
65//! println!("Error {} after {} restarts", error, restarts)
66//! }
67//! };
68//! }
69//! ```
70
71mod outcome;
72
73pub use outcome::{Failure, Success};
74use pin_project::pin_project;
75use std::future::Future;
76use std::pin::Pin;
77use std::task::{Context, Poll};
78use std::time::{Duration, Instant};
79
80/// Wraps an inner future, restarting it until it resolves a value that passes a test, or times out.
81///
82/// This is a Future adaptor, meaning it wraps other futures, like [`future::map`](https://docs.rs/futures/0.3.4/futures/future/trait.FutureExt.html#method.map)
83/// When this future is polled, it polls the inner future. If the inner futures resolves, its value
84/// is run through a `test` closure, which is of type `Fn(Future::Output) -> Result<T,E>`.
85///
86/// If the test is successful, `Restartable` will resolve to a [`Success<T>`](https://docs.rs/restartables/0.4.1/restartables/struct.Success.html).
87///
88/// If the test is unsuccessful, the future is recreated and retried. Unless the timeout has expired,
89/// in which case `Restartable` will resolve to a [`Failure<E>`](https://docs.rs/restartables/0.4.1/restartables/enum.Failure.html)
90///
91/// Because this fail-restart loop could go on forever, you should supply a timeout. If a `None`
92/// timeout is used, then awaiting the `Restartable` might never finish (because of this fail-restart
93/// loop).
94#[pin_project]
95pub struct Restartable<Fut, Test, Factory, T, E>
96where
97 Fut: Future,
98 Factory: Fn() -> Fut,
99 Test: Fn(Fut::Output) -> Result<T, E>,
100{
101 #[pin]
102 future: Fut,
103 start: Option<Instant>,
104 factory: Factory,
105 timeout: Option<Duration>,
106 test: Test,
107 restarts: usize,
108}
109
110impl<Fut, Test, Factory, T, E> Restartable<Fut, Test, Factory, T, E>
111where
112 Fut: Future,
113 Factory: Fn() -> Fut,
114 Test: Fn(Fut::Output) -> Result<T, E>,
115{
116 pub fn new(factory: Factory, timeout: Option<Duration>, test: Test) -> Self {
117 Restartable {
118 future: factory(),
119 factory,
120 timeout,
121 test,
122 start: None,
123 restarts: 0,
124 }
125 }
126}
127
128impl<Fut, Test, Factory, T, E> Future for Restartable<Fut, Test, Factory, T, E>
129where
130 Fut: Future,
131 Factory: Fn() -> Fut,
132 Test: Fn(Fut::Output) -> Result<T, E>,
133{
134 type Output = Result<Success<T>, Failure<E>>;
135
136 fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
137 let mut this = self.project();
138 let start = this.start.get_or_insert_with(Instant::now);
139
140 // Call the inner poll, run the result through `self.test`.
141 let inner_poll = this.future.as_mut().poll(cx).map(this.test);
142
143 // Measure timing
144 let elapsed = start.elapsed();
145 let timed_out = if let Some(timeout) = *this.timeout {
146 elapsed > timeout
147 } else {
148 false
149 };
150
151 match (inner_poll, timed_out) {
152 // Inner future timed out without ever resolving
153 (Poll::Pending, true) => Poll::Ready(Err(Failure::Timeout)),
154 // There's still time to poll again
155 (Poll::Pending, false) => Poll::Pending,
156 // Success!
157 (Poll::Ready(Ok(resp)), _) => Poll::Ready(Ok(Success {
158 value: resp,
159 duration: elapsed,
160 restarts: *this.restarts,
161 })),
162 // Failure, but there's still time to restart the future and try again.
163 (Poll::Ready(Err(_)), false) => {
164 cx.waker().wake_by_ref();
165 let new_future = (this.factory)();
166 this.future.set(new_future);
167 *this.restarts += 1;
168 Poll::Pending
169 }
170 // Failure, and the timeout has expired, so return the failure.
171 (Poll::Ready(Err(e)), true) => Poll::Ready(Err(Failure::Err {
172 error: e,
173 restarts: *this.restarts,
174 })),
175 }
176 }
177}