future_eyeballs/
lib.rs

1//! Happy Eyeballs algorithm for attempting a set of futures in parallel.
2//!
3//! This library provides a variant of a set of unordered futures which attempts
4//! each with a delay between starting. The first successful future is returned.
5
6use std::collections::VecDeque;
7use std::future::IntoFuture;
8use std::time::Instant;
9use std::{fmt, future::Future, marker::PhantomData, time::Duration};
10
11use futures::StreamExt;
12use futures::future::BoxFuture;
13use futures::stream::FuturesUnordered;
14use tokio::time::error::Elapsed;
15use tracing::trace;
16
17/// Error returned when the happy eyeballs algorithm finishes.
18///
19/// It contains the inner error if an underlying future errored
20/// (this will always be the first error)
21///
22/// Otherwsie, the enum indicates what went wrong.
23#[non_exhaustive]
24#[derive(Debug, PartialEq, Eq)]
25pub enum HappyEyeballsError<T> {
26    /// The timeout was reached.
27    Timeout(Duration),
28
29    /// No progress can be made.
30    NoProgress,
31
32    /// An error occurred during the underlying future.
33    Error(T),
34}
35
36impl<T> fmt::Display for HappyEyeballsError<T>
37where
38    T: fmt::Display,
39{
40    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
41        match self {
42            Self::NoProgress => write!(f, "no progress can be made"),
43            Self::Error(e) => write!(f, "error: {e}"),
44            Self::Timeout(d) => write!(f, "timeout: {}ms", d.as_millis()),
45        }
46    }
47}
48
49impl<T> std::error::Error for HappyEyeballsError<T>
50where
51    T: std::error::Error + 'static,
52{
53    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
54        match self {
55            Self::Error(e) => Some(e),
56            _ => None,
57        }
58    }
59}
60
61type HappyEyeballsResult<T, E> = Result<T, HappyEyeballsError<E>>;
62
63#[derive(Debug, Default)]
64pub struct EyeballConfiguration {
65    pub concurrent_start_delay: Option<Duration>,
66    pub overall_timeout: Option<Duration>,
67    pub initial_concurrency: Option<usize>,
68    pub maximum_concurrency: Option<usize>,
69}
70
71/// Implements the Happy Eyeballs algorithm for connecting to a set of addresses.
72///
73/// This algorithm is used to connect to a set of addresses in parallel, with a
74/// delay between each attempt. The first successful connection is returned.
75///
76/// When the `timeout` is not set, the algorithm will attempt to connect to only
77/// one address at a time.
78///
79/// To connect to all addresses simultaneously, set the `timeout` to zero.
80#[derive(Debug)]
81pub struct EyeballSet<F, T, E> {
82    queue: VecDeque<F>,
83    tasks: FuturesUnordered<F>,
84    config: EyeballConfiguration,
85    started: Option<Instant>,
86    error: Option<HappyEyeballsError<E>>,
87    result: PhantomData<fn() -> T>,
88}
89
90impl<F, T, E> EyeballSet<F, T, E> {
91    /// Create a new `EyeballSet` with an optional timeout.
92    ///
93    /// The timeout is the amount of time between individual connection attempts.
94    pub fn new(configuration: EyeballConfiguration) -> Self {
95        Self {
96            queue: VecDeque::new(),
97            tasks: FuturesUnordered::new(),
98            config: configuration,
99            started: None,
100            error: None,
101            result: PhantomData,
102        }
103    }
104
105    /// Returns `true` if the set of tasks is empty.
106    #[allow(dead_code)]
107    pub fn is_empty(&self) -> bool {
108        self.tasks.is_empty() && self.queue.is_empty()
109    }
110
111    /// Returns the number of tasks in the set.
112    #[allow(dead_code)]
113    pub fn len(&self) -> usize {
114        self.tasks.len() + self.queue.len()
115    }
116
117    /// Push a future into the set of tasks.
118    #[allow(dead_code)]
119    pub fn push(&mut self, future: F)
120    where
121        F: Future<Output = std::result::Result<T, E>>,
122    {
123        self.queue.push_back(future);
124    }
125}
126
127enum Eyeball<T> {
128    Ok(T),
129    Error,
130    Exhausted,
131}
132
133impl<F, T, E> EyeballSet<F, T, E>
134where
135    F: Future<Output = Result<T, E>>,
136{
137    async fn join_next(&mut self) -> Eyeball<T> {
138        self.started.get_or_insert_with(Instant::now);
139
140        match self.tasks.next().await {
141            Some(Ok(stream)) => Eyeball::Ok(stream),
142            Some(Err(e)) if self.error.is_none() => {
143                trace!("first attempt error");
144                self.error = Some(HappyEyeballsError::Error(e));
145                Eyeball::Error
146            }
147            Some(Err(_)) => {
148                trace!("attempt error");
149                Eyeball::Error
150            }
151            None => {
152                trace!("exhausted attempts");
153                Eyeball::Exhausted
154            }
155        }
156    }
157
158    async fn join_next_with_delay(&mut self) -> Result<Eyeball<T>, Elapsed> {
159        if let Some(timeout) = self.config.concurrent_start_delay {
160            tokio::time::timeout(timeout, self.join_next()).await
161        } else {
162            Ok(self.join_next().await)
163        }
164    }
165
166    async fn process_all(&mut self) -> HappyEyeballsResult<T, E> {
167        for _ in 0..self.config.initial_concurrency.unwrap_or(self.queue.len()) {
168            if let Some(future) = self.queue.pop_front() {
169                self.tasks.push(future);
170            }
171        }
172
173        loop {
174            if self.queue.is_empty() {
175                match self.join_next().await {
176                    Eyeball::Ok(outcome) => return Ok(outcome),
177                    Eyeball::Error => continue,
178                    Eyeball::Exhausted => {
179                        return self
180                            .error
181                            .take()
182                            .map(Err)
183                            .unwrap_or(Err(HappyEyeballsError::NoProgress));
184                    }
185                }
186            } else {
187                if let Ok(Eyeball::Ok(output)) = self.join_next_with_delay().await {
188                    return Ok(output);
189                }
190
191                if self
192                    .config
193                    .maximum_concurrency
194                    .is_none_or(|c| self.tasks.len() < c)
195                {
196                    if let Some(future) = self.queue.pop_front() {
197                        self.tasks.push(future);
198                    }
199                }
200            }
201        }
202    }
203
204    /// Finish the happy eyeballs algorithm, returning the first successful connection.
205    pub async fn finish(&mut self) -> HappyEyeballsResult<T, E> {
206        let result = match self.config.overall_timeout {
207            Some(timeout) => tokio::time::timeout(timeout, self.process_all()).await,
208            None => Ok(self.process_all().await),
209        };
210
211        match result {
212            Ok(Ok(outcome)) => Ok(outcome),
213            Ok(Err(e)) => Err(e),
214            Err(_) => Err(HappyEyeballsError::Timeout(
215                self.started.unwrap_or_else(Instant::now).elapsed(),
216            )),
217        }
218    }
219}
220
221pub struct EyeballFuture<T, E>(BoxFuture<'static, Result<T, E>>);
222
223impl<T, E> fmt::Debug for EyeballFuture<T, E> {
224    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
225        f.debug_tuple("EyeballFuture").finish()
226    }
227}
228
229impl<T, E> Future for EyeballFuture<T, E> {
230    type Output = Result<T, E>;
231
232    fn poll(
233        mut self: std::pin::Pin<&mut Self>,
234        cx: &mut std::task::Context<'_>,
235    ) -> std::task::Poll<Self::Output> {
236        self.0.as_mut().poll(cx)
237    }
238}
239
240impl<F, T, E> IntoFuture for EyeballSet<F, T, E>
241where
242    T: Send + 'static,
243    E: Send + 'static,
244    F: Future<Output = Result<T, E>> + Send + 'static,
245{
246    type Output = HappyEyeballsResult<T, E>;
247    type IntoFuture = BoxFuture<'static, Self::Output>;
248
249    fn into_future(mut self) -> Self::IntoFuture {
250        Box::pin(async move { self.finish().await })
251    }
252}
253
254impl<F, T, E> Extend<F> for EyeballSet<F, T, E>
255where
256    F: Future<Output = Result<T, E>>,
257{
258    fn extend<I: IntoIterator<Item = F>>(&mut self, iter: I) {
259        self.queue.extend(iter);
260    }
261}
262
263#[cfg(test)]
264mod tests {
265    use std::future::Pending;
266    use std::future::pending;
267    use std::future::ready;
268
269    use super::*;
270
271    fn cfg_immediate() -> EyeballConfiguration {
272        EyeballConfiguration {
273            concurrent_start_delay: Some(Duration::ZERO),
274            overall_timeout: Some(Duration::ZERO),
275            ..Default::default()
276        }
277    }
278
279    macro_rules! tokio_test {
280        (async fn $fn:ident() { $($body:tt)+ }) => {
281            #[test]
282            fn $fn() {
283                tokio::runtime::Builder::new_current_thread().enable_all()
284                        .build()
285                        .unwrap()
286                        .block_on(async {
287                            $($body)*
288                        })
289            }
290        };
291    }
292
293    tokio_test! {
294    async fn one_future_success() {
295        let mut eyeballs = EyeballSet::new(cfg_immediate());
296
297        let future = async { Ok::<_, String>(5) };
298
299        eyeballs.push(future);
300
301        assert!(!eyeballs.is_empty());
302
303        let result = eyeballs.await;
304        assert_eq!(result.unwrap(), 5);
305    }}
306
307    tokio_test! {
308    async fn one_future_error() {
309        let mut eyeballs: EyeballSet<_, (), &str> = EyeballSet::new(cfg_immediate());
310
311        let future = async { Err::<(), _>("error") };
312
313        eyeballs.push(future);
314
315        let result = eyeballs.await;
316        assert!(matches!(
317            result.unwrap_err(),
318            HappyEyeballsError::Error("error")
319        ));
320    }
321    }
322
323    tokio_test! {
324    async fn one_future_timeout() {
325        let mut eyeballs: EyeballSet<_, (), &str> = EyeballSet::new(cfg_immediate());
326
327        let future = pending();
328        eyeballs.push(future);
329
330        let result = eyeballs.await;
331        assert!(matches!(
332            result.unwrap_err(),
333            HappyEyeballsError::Timeout(_)
334        ));
335    }
336    }
337
338    tokio_test! {
339    async fn empty_set() {
340        let eyeballs: EyeballSet<Pending<Result<(), &str>>, (), &str> =
341            EyeballSet::new(cfg_immediate());
342
343        assert!(eyeballs.is_empty());
344        let result = eyeballs.await;
345        assert!(matches!(
346            result.unwrap_err(),
347            HappyEyeballsError::NoProgress
348        ));
349    }
350    }
351
352    tokio_test! {
353    async fn multiple_futures_success() {
354        let mut eyeballs = EyeballSet::new(cfg_immediate());
355
356        let future1 = ready(Err::<u32, String>("error".into()));
357        let future2 = ready(Ok::<_, String>(5));
358        let future3 = ready(Ok::<_, String>(10));
359
360        eyeballs.extend(vec![future1, future2, future3]);
361        let result = eyeballs.await;
362
363        assert_eq!(result.unwrap(), 5);
364    }
365    }
366
367    tokio_test! {
368    async fn multiple_futures_until_finished() {
369        let mut eyeballs = EyeballSet::new(cfg_immediate());
370
371        let future1 = ready(Err::<u32, String>("error".into()));
372        let future2 = ready(Ok::<_, String>(5));
373        let future3 = ready(Ok::<_, String>(10));
374
375        eyeballs.push(future1);
376        eyeballs.push(future2);
377        eyeballs.push(future3);
378
379        assert_eq!(eyeballs.len(), 3);
380
381        let result = eyeballs.await;
382
383        assert_eq!(result.unwrap(), 5);
384    }
385    }
386
387    tokio_test! {
388    async fn multiple_futures_error() {
389        let mut eyeballs = EyeballSet::new(cfg_immediate());
390
391        let future1 = ready(Err::<u32, &str>("error 1"));
392        let future2 = ready(Err::<u32, &str>("error 2"));
393        let future3 = ready(Err::<u32, &str>("error 3"));
394
395        eyeballs.extend(vec![future1, future2, future3]);
396        let result = eyeballs.await;
397
398        assert!(matches!(
399            result.unwrap_err(),
400            HappyEyeballsError::Error("error 1")
401        ));
402    }
403    }
404
405    tokio_test! {
406    async fn no_timeout() {
407        let mut eyeballs = EyeballSet::new(Default::default());
408
409        let future1 = ready(Err::<u32, &str>("error 1"));
410        let future2 = ready(Err::<u32, &str>("error 2"));
411        let future3 = ready(Err::<u32, &str>("error 3"));
412
413        eyeballs.extend(vec![future1, future2, future3]);
414
415        let result = eyeballs.await;
416
417        assert!(matches!(
418            result.unwrap_err(),
419            HappyEyeballsError::Error("error 1")
420        ));
421    }
422    }
423}