1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178
//! Say, for example, that you want to keep pinging a URL until it returns 200, or five seconds pass. //! And if the URL _does_ return 200, you'd like to know how long that took. //! //! This library contains a Future wrapper. It wraps up a Future you want to retry, and it keeps retrying //! the future until it passes a Test you provide. If the inner future passes the Test, then the wrapper //! resolves your value. But if the inner future fails the Test, the wrapper will just restart the future. //! Assuming the timeout hasn't expired. //! //! To do this, you need to provide three things when instantiating the wrapper: //! - A future to poll //! - A test, i.e. a closure which takes values from the inner future, runs a test on it, and returns Result //! - A factory to make new futures if the previous future resolved a value that failed the test. //! //! The wrapper will also return some metrics, i.e. how much time elapsed before the future resolved, and //! how many restarts were necessary. //! //! If the future you're using is from [`reqwest`](https://docs.rs/reqwest), consider using the [`reqw`](reqw/index.html) //! module to simplify setup. This requires the `use_reqwest` feature. //! //! # Example //! //! ``` //! use restartables::{Failure, Restartable}; //! use std::future::Future; //! use std::pin::Pin; //! use std::task::{Context, Poll}; //! use std::time::Duration; //! //! // A Future that yields a random u16 when it resolves. //! struct RandomNum {} //! impl Future for RandomNum { //! type Output = u16; //! fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> { //! cx.waker().wake_by_ref(); //! Poll::Ready(rand::random()) //! } //! } //! //! fn print_random_even_number() { //! // This closure produces futures that the Restartable will poll //! let factory = || RandomNum {}; //! //! // This test returns even numbers, and fails odd numbers. //! let test_is_even = |num| { //! if num % 2 == 0 { //! Ok(num) //! } else { //! Err("number wasn't even") //! } //! }; //! //! // Wrap the inner `RandomNum` future into a `Restartable` future. //! let retrying = Restartable::new( //! factory, //! Some(Duration::from_millis(1)), //! test_is_even, //! ); //! //! match retrying.await { //! Ok(success) => println!( //! "Final number was {}, which took {}us and {} restarts to get", //! success.value, //! success.duration.as_micros(), //! success.restarts //! ), //! Err(Failure::Timeout) => println!("Never found an even number :("), //! Err(Failure::Err { error, restarts }) => { //! println!("Error {} after {} restarts", error, restarts) //! } //! }; //! } //! ``` mod outcome; #[cfg(feature = "use_reqwest")] pub mod reqw; pub use outcome::{Failure, Success}; use pin_project::pin_project; use std::future::Future; use std::pin::Pin; use std::task::{Context, Poll}; use std::time::{Duration, Instant}; /// Wraps an inner future, restarting it until it resolves a value that passes a test, or times out. /// /// This is a Future adaptor, meaning it wraps other futures, like [`future::map`](https://docs.rs/futures/0.3.4/futures/future/trait.FutureExt.html#method.map) /// When this future is polled, it polls the inner future. If the inner futures resolves, its value /// is run through a `test` closure. /// /// If the test is successful, the value is returned with timing information. /// If the test is unsuccessful, the future is recreated and retried. /// Because this fail-restart loop could go on forever, you should supply a timeout. If a `None` /// timeout is used, then awaiting the `Restartable` is not guaranteed to resolve. #[pin_project] pub struct Restartable<Fut, Test, Factory, T, E> where Fut: Future, Factory: Fn() -> Fut, Test: Fn(Fut::Output) -> Result<T, E>, { #[pin] future: Fut, start: Option<Instant>, factory: Factory, timeout: Option<Duration>, test: Test, restarts: usize, } impl<Fut, Test, Factory, T, E> Restartable<Fut, Test, Factory, T, E> where Fut: Future, Factory: Fn() -> Fut, Test: Fn(Fut::Output) -> Result<T, E>, { pub fn new(factory: Factory, timeout: Option<Duration>, test: Test) -> Self { Restartable { future: factory(), factory, timeout, test, start: None, restarts: 0, } } } impl<Fut, Test, Factory, T, E> Future for Restartable<Fut, Test, Factory, T, E> where Fut: Future, Factory: Fn() -> Fut, Test: Fn(Fut::Output) -> Result<T, E>, { type Output = Result<Success<T>, Failure<E>>; fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> { let mut this = self.project(); let start = this.start.get_or_insert_with(Instant::now); // Call the inner poll, run the result through `self.test`. let inner_poll = this.future.as_mut().poll(cx).map(this.test); // Measure timing let elapsed = start.elapsed(); let timed_out = if let Some(timeout) = *this.timeout { elapsed > timeout } else { false }; match (inner_poll, timed_out) { // Inner future timed out without ever resolving (Poll::Pending, true) => Poll::Ready(Err(Failure::Timeout)), // There's still time to poll again (Poll::Pending, false) => Poll::Pending, // Success! (Poll::Ready(Ok(resp)), _) => Poll::Ready(Ok(Success { value: resp, duration: elapsed, restarts: *this.restarts, })), // Failure, but there's still time to restart the future and try again. (Poll::Ready(Err(_)), false) => { cx.waker().wake_by_ref(); let new_future = (this.factory)(); this.future.set(new_future); *this.restarts += 1; Poll::Pending } // Failure, and the timeout has expired, so return the failure. (Poll::Ready(Err(e)), true) => Poll::Ready(Err(Failure::Err { error: e, restarts: *this.restarts, })), } } }