1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
//! Background resolver.
//!
//! This module contains utilities to aid with implementing resolvers that run
//! in the background in a [`tokio`] thread pool.
//!
//! You can provide a custom [`Resolve`] trait implementation that uses either
//! [`tokio::spawn`], [`tokio::task::spawn_blocking`] or anything else that
//! returns [`tokio::task::JoinHandle`].

use std::{
    fmt,
    future::Future,
    io,
    net::SocketAddr,
    pin::Pin,
    task::{self, Poll},
};

use hyper::client::connect::dns::Name;
use tokio::task::JoinHandle;
use tower_service::Service;

/// Resolve the name in the background.
pub trait Resolve {
    /// An iterator type used to enumerate the resolved addresses.
    type Iter: Iterator<Item = SocketAddr>;

    /// Perform the name resolution.
    fn resolve(&mut self, name: Name) -> JoinHandle<io::Result<Self::Iter>>;
}

/// A [`hyper`]-compatible resolver implementation.
/// Delegates the actual resolution logic to the generic parameter `T`.
#[derive(Clone)]
pub struct Resolver<T> {
    inner: T,
}

impl<T> Resolver<T> {
    /// Create a new [`Resolver`] backed by `inner` resolution implementation.
    pub fn new(inner: T) -> Self {
        Self { inner }
    }

    /// Consume [`Resolver`] and return the wrapped `T`.
    pub fn into_inner(self) -> T {
        self.inner
    }
}

impl<T> AsRef<T> for Resolver<T> {
    fn as_ref(&self) -> &T {
        &self.inner
    }
}

impl<T> Service<Name> for Resolver<T>
where
    T: Resolve + Send + Sync + 'static,
{
    type Response = <T as Resolve>::Iter;
    type Error = io::Error;
    type Future = ResolverFuture<<T as Resolve>::Iter>;

    fn poll_ready(&mut self, _cx: &mut task::Context<'_>) -> Poll<Result<(), io::Error>> {
        Poll::Ready(Ok(()))
    }

    fn call(&mut self, name: Name) -> Self::Future {
        let handle = self.inner.resolve(name);
        ResolverFuture { inner: handle }
    }
}

impl<T> fmt::Debug for Resolver<T> {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        f.pad("Resolver")
    }
}

/// The opaque resolver future.
///
/// Ready when the underlying background resolution is ready.
/// Propagates panics from the underlying task.
pub struct ResolverFuture<T> {
    inner: JoinHandle<io::Result<T>>,
}

impl<T> Future for ResolverFuture<T> {
    type Output = io::Result<T>;

    fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
        Pin::new(&mut self.inner).poll(cx).map(|res| match res {
            Ok(Ok(addrs)) => Ok(addrs),
            Ok(Err(err)) => Err(err),
            Err(join_err) => {
                if join_err.is_cancelled() {
                    Err(io::Error::new(io::ErrorKind::Interrupted, join_err))
                } else {
                    panic!("resolver Resolver task failed: {:?}", join_err)
                }
            }
        })
    }
}

impl<T> fmt::Debug for ResolverFuture<T> {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        f.pad("ResolverFuture")
    }
}