restartables/
lib.rs

1//! Say, for example, that you want to keep pinging a URL until it returns 200, or five seconds pass.
2//! And if the URL _does_ return 200, you'd like to know how long that took.
3//!
4//! This library contains a Future wrapper named [`Restartable`](https://docs.rs/restartables/0.4.1/restartables/struct.Restartable.html). It wraps up a Future you want to retry, and it keeps retrying
5//! the future until it passes a Test you provide. If the inner future passes the Test, then the wrapper
6//! resolves your value. But if the inner future fails the Test, the wrapper will just restart the future.
7//! Assuming the timeout hasn't expired.
8//!
9//! To do this, you need to provide three things when instantiating the wrapper:
10//! - A future to poll
11//! - A test, i.e. a closure which takes values from the inner future, runs a test on it, and returns Result
12//! - A factory to make new futures if the previous future resolved a value that failed the test.
13//!
14//! The wrapper will also return some metrics, i.e. how much time elapsed before the future resolved, and
15//! how many restarts were necessary.
16//!
17//! # Example
18//!
19//! ```
20//! use restartables::{Failure, Restartable};
21//! use std::future::Future;
22//! use std::pin::Pin;
23//! use std::task::{Context, Poll};
24//! use std::time::Duration;
25//!
26//! // A Future that yields a random u16 when it resolves.
27//! struct RandomNum {}
28//! impl Future for RandomNum {
29//!     type Output = u16;
30//!     fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
31//!         cx.waker().wake_by_ref();
32//!         Poll::Ready(rand::random())
33//!     }
34//! }
35//!
36//! async fn print_random_even_number() {
37//!     // This closure produces futures that the Restartable will poll
38//!     let factory = || RandomNum {};
39//!
40//!     // This test returns even numbers, and fails odd numbers.
41//!     let test_is_even = |num| {
42//!         if num % 2 == 0 {
43//!             Ok(num)
44//!         } else {
45//!             Err("number wasn't even")
46//!         }
47//!     };
48//!
49//!     // Wrap the inner `RandomNum` future into a `Restartable` future.
50//!     let retrying = Restartable::new(
51//!         factory,
52//!         Some(Duration::from_millis(1)),
53//!         test_is_even,
54//!     );
55//!
56//!     match retrying.await {
57//!         Ok(success) => println!(
58//!             "Final number was {}, which took {}us and {} restarts to get",
59//!             success.value,
60//!             success.duration.as_micros(),
61//!             success.restarts
62//!         ),
63//!         Err(Failure::Timeout) => println!("Never found an even number :("),
64//!         Err(Failure::Err { error, restarts }) => {
65//!             println!("Error {} after {} restarts", error, restarts)
66//!         }
67//!     };
68//! }
69//! ```
70
71mod outcome;
72
73pub use outcome::{Failure, Success};
74use pin_project::pin_project;
75use std::future::Future;
76use std::pin::Pin;
77use std::task::{Context, Poll};
78use std::time::{Duration, Instant};
79
80/// Wraps an inner future, restarting it until it resolves a value that passes a test, or times out.
81///
82/// This is a Future adaptor, meaning it wraps other futures, like [`future::map`](https://docs.rs/futures/0.3.4/futures/future/trait.FutureExt.html#method.map)
83/// When this future is polled, it polls the inner future. If the inner futures resolves, its value
84/// is run through a `test` closure, which is of type `Fn(Future::Output) -> Result<T,E>`.
85///
86/// If the test is successful, `Restartable` will resolve to a [`Success<T>`](https://docs.rs/restartables/0.4.1/restartables/struct.Success.html).
87///
88/// If the test is unsuccessful, the future is recreated and retried. Unless the timeout has expired,
89/// in which case `Restartable` will resolve to a [`Failure<E>`](https://docs.rs/restartables/0.4.1/restartables/enum.Failure.html)
90///
91/// Because this fail-restart loop could go on forever, you should supply a timeout. If a `None`
92/// timeout is used, then awaiting the `Restartable` might never finish (because of this fail-restart
93/// loop).
94#[pin_project]
95pub struct Restartable<Fut, Test, Factory, T, E>
96where
97    Fut: Future,
98    Factory: Fn() -> Fut,
99    Test: Fn(Fut::Output) -> Result<T, E>,
100{
101    #[pin]
102    future: Fut,
103    start: Option<Instant>,
104    factory: Factory,
105    timeout: Option<Duration>,
106    test: Test,
107    restarts: usize,
108}
109
110impl<Fut, Test, Factory, T, E> Restartable<Fut, Test, Factory, T, E>
111where
112    Fut: Future,
113    Factory: Fn() -> Fut,
114    Test: Fn(Fut::Output) -> Result<T, E>,
115{
116    pub fn new(factory: Factory, timeout: Option<Duration>, test: Test) -> Self {
117        Restartable {
118            future: factory(),
119            factory,
120            timeout,
121            test,
122            start: None,
123            restarts: 0,
124        }
125    }
126}
127
128impl<Fut, Test, Factory, T, E> Future for Restartable<Fut, Test, Factory, T, E>
129where
130    Fut: Future,
131    Factory: Fn() -> Fut,
132    Test: Fn(Fut::Output) -> Result<T, E>,
133{
134    type Output = Result<Success<T>, Failure<E>>;
135
136    fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
137        let mut this = self.project();
138        let start = this.start.get_or_insert_with(Instant::now);
139
140        // Call the inner poll, run the result through `self.test`.
141        let inner_poll = this.future.as_mut().poll(cx).map(this.test);
142
143        // Measure timing
144        let elapsed = start.elapsed();
145        let timed_out = if let Some(timeout) = *this.timeout {
146            elapsed > timeout
147        } else {
148            false
149        };
150
151        match (inner_poll, timed_out) {
152            // Inner future timed out without ever resolving
153            (Poll::Pending, true) => Poll::Ready(Err(Failure::Timeout)),
154            // There's still time to poll again
155            (Poll::Pending, false) => Poll::Pending,
156            // Success!
157            (Poll::Ready(Ok(resp)), _) => Poll::Ready(Ok(Success {
158                value: resp,
159                duration: elapsed,
160                restarts: *this.restarts,
161            })),
162            // Failure, but there's still time to restart the future and try again.
163            (Poll::Ready(Err(_)), false) => {
164                cx.waker().wake_by_ref();
165                let new_future = (this.factory)();
166                this.future.set(new_future);
167                *this.restarts += 1;
168                Poll::Pending
169            }
170            // Failure, and the timeout has expired, so return the failure.
171            (Poll::Ready(Err(e)), true) => Poll::Ready(Err(Failure::Err {
172                error: e,
173                restarts: *this.restarts,
174            })),
175        }
176    }
177}