1use std::collections::VecDeque;
7use std::future::IntoFuture;
8use std::time::Instant;
9use std::{fmt, future::Future, marker::PhantomData, time::Duration};
10
11use futures::StreamExt;
12use futures::future::BoxFuture;
13use futures::stream::FuturesUnordered;
14use tokio::time::error::Elapsed;
15use tracing::trace;
16
17#[non_exhaustive]
24#[derive(Debug, PartialEq, Eq)]
25pub enum HappyEyeballsError<T> {
26 Timeout(Duration),
28
29 NoProgress,
31
32 Error(T),
34}
35
36impl<T> fmt::Display for HappyEyeballsError<T>
37where
38 T: fmt::Display,
39{
40 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
41 match self {
42 Self::NoProgress => write!(f, "no progress can be made"),
43 Self::Error(e) => write!(f, "error: {e}"),
44 Self::Timeout(d) => write!(f, "timeout: {}ms", d.as_millis()),
45 }
46 }
47}
48
49impl<T> std::error::Error for HappyEyeballsError<T>
50where
51 T: std::error::Error + 'static,
52{
53 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
54 match self {
55 Self::Error(e) => Some(e),
56 _ => None,
57 }
58 }
59}
60
61type HappyEyeballsResult<T, E> = Result<T, HappyEyeballsError<E>>;
62
63#[derive(Debug, Default)]
64pub struct EyeballConfiguration {
65 pub concurrent_start_delay: Option<Duration>,
66 pub overall_timeout: Option<Duration>,
67 pub initial_concurrency: Option<usize>,
68 pub maximum_concurrency: Option<usize>,
69}
70
71#[derive(Debug)]
81pub struct EyeballSet<F, T, E> {
82 queue: VecDeque<F>,
83 tasks: FuturesUnordered<F>,
84 config: EyeballConfiguration,
85 started: Option<Instant>,
86 error: Option<HappyEyeballsError<E>>,
87 result: PhantomData<fn() -> T>,
88}
89
90impl<F, T, E> EyeballSet<F, T, E> {
91 pub fn new(configuration: EyeballConfiguration) -> Self {
95 Self {
96 queue: VecDeque::new(),
97 tasks: FuturesUnordered::new(),
98 config: configuration,
99 started: None,
100 error: None,
101 result: PhantomData,
102 }
103 }
104
105 #[allow(dead_code)]
107 pub fn is_empty(&self) -> bool {
108 self.tasks.is_empty() && self.queue.is_empty()
109 }
110
111 #[allow(dead_code)]
113 pub fn len(&self) -> usize {
114 self.tasks.len() + self.queue.len()
115 }
116
117 #[allow(dead_code)]
119 pub fn push(&mut self, future: F)
120 where
121 F: Future<Output = std::result::Result<T, E>>,
122 {
123 self.queue.push_back(future);
124 }
125}
126
127enum Eyeball<T> {
128 Ok(T),
129 Error,
130 Exhausted,
131}
132
133impl<F, T, E> EyeballSet<F, T, E>
134where
135 F: Future<Output = Result<T, E>>,
136{
137 async fn join_next(&mut self) -> Eyeball<T> {
138 self.started.get_or_insert_with(Instant::now);
139
140 match self.tasks.next().await {
141 Some(Ok(stream)) => Eyeball::Ok(stream),
142 Some(Err(e)) if self.error.is_none() => {
143 trace!("first attempt error");
144 self.error = Some(HappyEyeballsError::Error(e));
145 Eyeball::Error
146 }
147 Some(Err(_)) => {
148 trace!("attempt error");
149 Eyeball::Error
150 }
151 None => {
152 trace!("exhausted attempts");
153 Eyeball::Exhausted
154 }
155 }
156 }
157
158 async fn join_next_with_delay(&mut self) -> Result<Eyeball<T>, Elapsed> {
159 if let Some(timeout) = self.config.concurrent_start_delay {
160 tokio::time::timeout(timeout, self.join_next()).await
161 } else {
162 Ok(self.join_next().await)
163 }
164 }
165
166 async fn process_all(&mut self) -> HappyEyeballsResult<T, E> {
167 for _ in 0..self.config.initial_concurrency.unwrap_or(self.queue.len()) {
168 if let Some(future) = self.queue.pop_front() {
169 self.tasks.push(future);
170 }
171 }
172
173 loop {
174 if self.queue.is_empty() {
175 match self.join_next().await {
176 Eyeball::Ok(outcome) => return Ok(outcome),
177 Eyeball::Error => continue,
178 Eyeball::Exhausted => {
179 return self
180 .error
181 .take()
182 .map(Err)
183 .unwrap_or(Err(HappyEyeballsError::NoProgress));
184 }
185 }
186 } else {
187 if let Ok(Eyeball::Ok(output)) = self.join_next_with_delay().await {
188 return Ok(output);
189 }
190
191 if self
192 .config
193 .maximum_concurrency
194 .is_none_or(|c| self.tasks.len() < c)
195 {
196 if let Some(future) = self.queue.pop_front() {
197 self.tasks.push(future);
198 }
199 }
200 }
201 }
202 }
203
204 pub async fn finish(&mut self) -> HappyEyeballsResult<T, E> {
206 let result = match self.config.overall_timeout {
207 Some(timeout) => tokio::time::timeout(timeout, self.process_all()).await,
208 None => Ok(self.process_all().await),
209 };
210
211 match result {
212 Ok(Ok(outcome)) => Ok(outcome),
213 Ok(Err(e)) => Err(e),
214 Err(_) => Err(HappyEyeballsError::Timeout(
215 self.started.unwrap_or_else(Instant::now).elapsed(),
216 )),
217 }
218 }
219}
220
221pub struct EyeballFuture<T, E>(BoxFuture<'static, Result<T, E>>);
222
223impl<T, E> fmt::Debug for EyeballFuture<T, E> {
224 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
225 f.debug_tuple("EyeballFuture").finish()
226 }
227}
228
229impl<T, E> Future for EyeballFuture<T, E> {
230 type Output = Result<T, E>;
231
232 fn poll(
233 mut self: std::pin::Pin<&mut Self>,
234 cx: &mut std::task::Context<'_>,
235 ) -> std::task::Poll<Self::Output> {
236 self.0.as_mut().poll(cx)
237 }
238}
239
240impl<F, T, E> IntoFuture for EyeballSet<F, T, E>
241where
242 T: Send + 'static,
243 E: Send + 'static,
244 F: Future<Output = Result<T, E>> + Send + 'static,
245{
246 type Output = HappyEyeballsResult<T, E>;
247 type IntoFuture = BoxFuture<'static, Self::Output>;
248
249 fn into_future(mut self) -> Self::IntoFuture {
250 Box::pin(async move { self.finish().await })
251 }
252}
253
254impl<F, T, E> Extend<F> for EyeballSet<F, T, E>
255where
256 F: Future<Output = Result<T, E>>,
257{
258 fn extend<I: IntoIterator<Item = F>>(&mut self, iter: I) {
259 self.queue.extend(iter);
260 }
261}
262
263#[cfg(test)]
264mod tests {
265 use std::future::Pending;
266 use std::future::pending;
267 use std::future::ready;
268
269 use super::*;
270
271 fn cfg_immediate() -> EyeballConfiguration {
272 EyeballConfiguration {
273 concurrent_start_delay: Some(Duration::ZERO),
274 overall_timeout: Some(Duration::ZERO),
275 ..Default::default()
276 }
277 }
278
279 macro_rules! tokio_test {
280 (async fn $fn:ident() { $($body:tt)+ }) => {
281 #[test]
282 fn $fn() {
283 tokio::runtime::Builder::new_current_thread().enable_all()
284 .build()
285 .unwrap()
286 .block_on(async {
287 $($body)*
288 })
289 }
290 };
291 }
292
293 tokio_test! {
294 async fn one_future_success() {
295 let mut eyeballs = EyeballSet::new(cfg_immediate());
296
297 let future = async { Ok::<_, String>(5) };
298
299 eyeballs.push(future);
300
301 assert!(!eyeballs.is_empty());
302
303 let result = eyeballs.await;
304 assert_eq!(result.unwrap(), 5);
305 }}
306
307 tokio_test! {
308 async fn one_future_error() {
309 let mut eyeballs: EyeballSet<_, (), &str> = EyeballSet::new(cfg_immediate());
310
311 let future = async { Err::<(), _>("error") };
312
313 eyeballs.push(future);
314
315 let result = eyeballs.await;
316 assert!(matches!(
317 result.unwrap_err(),
318 HappyEyeballsError::Error("error")
319 ));
320 }
321 }
322
323 tokio_test! {
324 async fn one_future_timeout() {
325 let mut eyeballs: EyeballSet<_, (), &str> = EyeballSet::new(cfg_immediate());
326
327 let future = pending();
328 eyeballs.push(future);
329
330 let result = eyeballs.await;
331 assert!(matches!(
332 result.unwrap_err(),
333 HappyEyeballsError::Timeout(_)
334 ));
335 }
336 }
337
338 tokio_test! {
339 async fn empty_set() {
340 let eyeballs: EyeballSet<Pending<Result<(), &str>>, (), &str> =
341 EyeballSet::new(cfg_immediate());
342
343 assert!(eyeballs.is_empty());
344 let result = eyeballs.await;
345 assert!(matches!(
346 result.unwrap_err(),
347 HappyEyeballsError::NoProgress
348 ));
349 }
350 }
351
352 tokio_test! {
353 async fn multiple_futures_success() {
354 let mut eyeballs = EyeballSet::new(cfg_immediate());
355
356 let future1 = ready(Err::<u32, String>("error".into()));
357 let future2 = ready(Ok::<_, String>(5));
358 let future3 = ready(Ok::<_, String>(10));
359
360 eyeballs.extend(vec![future1, future2, future3]);
361 let result = eyeballs.await;
362
363 assert_eq!(result.unwrap(), 5);
364 }
365 }
366
367 tokio_test! {
368 async fn multiple_futures_until_finished() {
369 let mut eyeballs = EyeballSet::new(cfg_immediate());
370
371 let future1 = ready(Err::<u32, String>("error".into()));
372 let future2 = ready(Ok::<_, String>(5));
373 let future3 = ready(Ok::<_, String>(10));
374
375 eyeballs.push(future1);
376 eyeballs.push(future2);
377 eyeballs.push(future3);
378
379 assert_eq!(eyeballs.len(), 3);
380
381 let result = eyeballs.await;
382
383 assert_eq!(result.unwrap(), 5);
384 }
385 }
386
387 tokio_test! {
388 async fn multiple_futures_error() {
389 let mut eyeballs = EyeballSet::new(cfg_immediate());
390
391 let future1 = ready(Err::<u32, &str>("error 1"));
392 let future2 = ready(Err::<u32, &str>("error 2"));
393 let future3 = ready(Err::<u32, &str>("error 3"));
394
395 eyeballs.extend(vec![future1, future2, future3]);
396 let result = eyeballs.await;
397
398 assert!(matches!(
399 result.unwrap_err(),
400 HappyEyeballsError::Error("error 1")
401 ));
402 }
403 }
404
405 tokio_test! {
406 async fn no_timeout() {
407 let mut eyeballs = EyeballSet::new(Default::default());
408
409 let future1 = ready(Err::<u32, &str>("error 1"));
410 let future2 = ready(Err::<u32, &str>("error 2"));
411 let future3 = ready(Err::<u32, &str>("error 3"));
412
413 eyeballs.extend(vec![future1, future2, future3]);
414
415 let result = eyeballs.await;
416
417 assert!(matches!(
418 result.unwrap_err(),
419 HappyEyeballsError::Error("error 1")
420 ));
421 }
422 }
423}