hyper_srv/
lib.rs

1//! This crate provides a wrapper around Hyper's connector with ability to preresolve SRV DNS records
2//! before supplying resulting `host:port` pair to the underlying connector.
3//! The exact algorithm is as following:
4//!
5//! 1) Check if a connection destination could be (theoretically) a srv record (has no port, etc).
6//!    Use the underlying connector otherwise.
7//! 2) Try to resolve the destination host and port using provided resolver (if set). In case no
8//!    srv records has been found use the underlying connector with the origin destination.
9//! 3) Use the first record resolved to create a new destination (`A`/`AAAA`) and
10//!    finally pass it to the underlying connector.
11
12#![warn(
13    absolute_paths_not_starting_with_crate,
14    meta_variable_misuse,
15    missing_debug_implementations,
16    missing_docs,
17    noop_method_call,
18    unreachable_pub,
19    unused_crate_dependencies,
20    unused_lifetimes,
21    clippy::cast_lossless,
22    clippy::cast_possible_truncation,
23    clippy::cast_possible_wrap,
24    clippy::cast_precision_loss,
25    clippy::cast_sign_loss,
26    clippy::checked_conversions,
27    clippy::cognitive_complexity,
28    clippy::exhaustive_enums,
29    clippy::exhaustive_structs,
30    clippy::future_not_send,
31    clippy::inconsistent_struct_constructor,
32    clippy::inefficient_to_string,
33    clippy::use_debug,
34    clippy::use_self
35)]
36
37use futures::{
38    future::BoxFuture,
39    ready,
40    task::{Context, Poll},
41    Future,
42};
43use hickory_resolver::{
44    error::{ResolveError, ResolveErrorKind},
45    lookup::SrvLookup,
46    TokioAsyncResolver,
47};
48use hyper::{client::connect::Connection, service::Service, Uri};
49use std::{error::Error, fmt, pin::Pin};
50use tokio::io::{AsyncRead, AsyncWrite};
51
52/// A wrapper around Hyper's [`Connect`]or with ability to preresolve SRV DNS records
53/// before supplying resulting `host:port` pair to the underlying connector.
54///
55/// [`Connect`]: ../hyper/client/connect/trait.Connect.html
56#[derive(Debug, Clone)]
57pub struct ServiceConnector<C> {
58    resolver: Option<TokioAsyncResolver>,
59    inner: C,
60}
61
62impl<C> Service<Uri> for ServiceConnector<C>
63where
64    C: Service<Uri> + Clone + Unpin,
65    C::Response: AsyncRead + AsyncWrite + Connection + Unpin + Send + 'static,
66    C::Error: Into<Box<dyn Error + Send + Sync>>,
67    C::Future: Unpin + Send,
68{
69    type Response = C::Response;
70    type Error = ServiceError;
71    type Future = ServiceConnecting<C>;
72
73    fn poll_ready(&mut self, ctx: &mut Context) -> Poll<Result<(), Self::Error>> {
74        self.inner.poll_ready(ctx).map_err(ServiceError::inner)
75    }
76
77    fn call(&mut self, uri: Uri) -> Self::Future {
78        let fut = match (&self.resolver, uri.host(), uri.port()) {
79            (Some(resolver), Some(_), None) => ServiceConnectingKind::Preresolve {
80                inner: self.inner.clone(),
81                fut: {
82                    let resolver = resolver.clone();
83                    Box::pin(async move {
84                        let host = uri.host().expect("host was right here, now it is gone");
85                        let resolved = resolver.srv_lookup(host).await;
86                        (resolved, uri)
87                    })
88                },
89            },
90            _ => ServiceConnectingKind::Inner {
91                fut: self.inner.call(uri),
92            },
93        };
94        ServiceConnecting(fut)
95    }
96}
97
98impl<C> ServiceConnector<C> {
99    /// Creates a new instance of [`ServiceConnector`] with provided connector and
100    /// optional DNS resolver. If the resolver is set to None all connections will be
101    /// handled directly by the underlying connector. This allows to toggle SRV resolving
102    /// mechanism without changing a type of connector used
103    /// in a client (as it must be named and can not even be made into a trait object).
104    ///
105    /// [`ServiceConnector`]: struct.ServiceConnector.html
106    pub fn new(inner: C, resolver: Option<TokioAsyncResolver>) -> Self {
107        Self { resolver, inner }
108    }
109}
110
111#[derive(Debug)]
112enum ServiceErrorKind {
113    Resolve(Box<ResolveError>),
114    Inner(Box<dyn Error + Send + Sync>),
115}
116
117/// An error type used in [`ServiceConnector`].
118///
119/// [`ServiceConnector`]: struct.ServiceConnector.html
120#[derive(Debug)]
121pub struct ServiceError(ServiceErrorKind);
122
123impl From<ResolveError> for ServiceError {
124    fn from(error: ResolveError) -> Self {
125        Self(ServiceErrorKind::Resolve(Box::new(error)))
126    }
127}
128
129impl fmt::Display for ServiceError {
130    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
131        match &self.0 {
132            ServiceErrorKind::Resolve(err) => fmt::Display::fmt(err, f),
133            ServiceErrorKind::Inner(err) => fmt::Display::fmt(err, f),
134        }
135    }
136}
137
138impl Error for ServiceError {
139    fn source(&self) -> Option<&(dyn Error + 'static)> {
140        match &self.0 {
141            ServiceErrorKind::Resolve(_) => None,
142            ServiceErrorKind::Inner(err) => Some(err.as_ref()),
143        }
144    }
145}
146
147impl ServiceError {
148    fn inner<E>(inner: E) -> Self
149    where
150        E: Into<Box<dyn Error + Send + Sync>>,
151    {
152        Self(ServiceErrorKind::Inner(inner.into()))
153    }
154}
155
156#[allow(clippy::large_enum_variant)]
157enum ServiceConnectingKind<C>
158where
159    C: Service<Uri> + Unpin,
160{
161    Preresolve {
162        inner: C,
163        fut: BoxFuture<'static, (Result<SrvLookup, ResolveError>, Uri)>,
164    },
165    Inner {
166        fut: C::Future,
167    },
168}
169
170impl<C> fmt::Debug for ServiceConnectingKind<C>
171where
172    C: Service<Uri> + Unpin,
173{
174    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
175        f.debug_struct("ServiceConnectingKind").finish()
176    }
177}
178
179/// This future represents a connection in progress returned by [`ServiceConnector`].
180///
181/// [`ServiceConnector`]: struct.ServiceConnector.html
182#[derive(Debug)]
183pub struct ServiceConnecting<C>(ServiceConnectingKind<C>)
184where
185    C: Service<Uri> + Unpin;
186
187impl<C> Future for ServiceConnecting<C>
188where
189    C: Service<Uri> + Unpin,
190    C::Response: AsyncRead + AsyncWrite + Connection + Unpin + Send + 'static,
191    C::Error: Into<Box<dyn Error + Send + Sync>>,
192    C::Future: Unpin + Send,
193{
194    type Output = Result<C::Response, ServiceError>;
195
196    fn poll(mut self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Self::Output> {
197        match &mut self.0 {
198            ServiceConnectingKind::Preresolve { inner, fut } => {
199                let (res, uri) = ready!(Pin::new(fut).poll(ctx));
200                let response = res.map(Some).or_else(|err| match err.kind() {
201                    ResolveErrorKind::NoRecordsFound { .. } => Ok(None),
202                    _unexpected => Err(ServiceError(ServiceErrorKind::Resolve(Box::new(err)))),
203                })?;
204                let uri = match response
205                    .as_ref()
206                    .and_then(|response| response.iter().next())
207                {
208                    Some(srv) => {
209                        let authority = format!("{}:{}", srv.target(), srv.port());
210                        let builder = Uri::builder().authority(authority.as_str());
211                        let builder = match uri.scheme() {
212                            Some(scheme) => builder.scheme(scheme.clone()),
213                            None => builder,
214                        };
215                        let builder = match uri.path_and_query() {
216                            Some(path_and_query) => builder.path_and_query(path_and_query.clone()),
217                            None => builder,
218                        };
219                        builder.build().map_err(ServiceError::inner)?
220                    }
221                    None => uri,
222                };
223                {
224                    *self = Self(ServiceConnectingKind::Inner {
225                        fut: inner.call(uri),
226                    });
227                }
228                self.poll(ctx)
229            }
230            ServiceConnectingKind::Inner { fut } => {
231                Pin::new(fut).poll(ctx).map_err(ServiceError::inner)
232            }
233        }
234    }
235}