1#![warn(
13 absolute_paths_not_starting_with_crate,
14 meta_variable_misuse,
15 missing_debug_implementations,
16 missing_docs,
17 noop_method_call,
18 unreachable_pub,
19 unused_crate_dependencies,
20 unused_lifetimes,
21 clippy::cast_lossless,
22 clippy::cast_possible_truncation,
23 clippy::cast_possible_wrap,
24 clippy::cast_precision_loss,
25 clippy::cast_sign_loss,
26 clippy::checked_conversions,
27 clippy::cognitive_complexity,
28 clippy::exhaustive_enums,
29 clippy::exhaustive_structs,
30 clippy::future_not_send,
31 clippy::inconsistent_struct_constructor,
32 clippy::inefficient_to_string,
33 clippy::use_debug,
34 clippy::use_self
35)]
36
37use futures::{
38 future::BoxFuture,
39 ready,
40 task::{Context, Poll},
41 Future,
42};
43use hickory_resolver::{
44 error::{ResolveError, ResolveErrorKind},
45 lookup::SrvLookup,
46 TokioAsyncResolver,
47};
48use hyper::{client::connect::Connection, service::Service, Uri};
49use std::{error::Error, fmt, pin::Pin};
50use tokio::io::{AsyncRead, AsyncWrite};
51
52#[derive(Debug, Clone)]
57pub struct ServiceConnector<C> {
58 resolver: Option<TokioAsyncResolver>,
59 inner: C,
60}
61
62impl<C> Service<Uri> for ServiceConnector<C>
63where
64 C: Service<Uri> + Clone + Unpin,
65 C::Response: AsyncRead + AsyncWrite + Connection + Unpin + Send + 'static,
66 C::Error: Into<Box<dyn Error + Send + Sync>>,
67 C::Future: Unpin + Send,
68{
69 type Response = C::Response;
70 type Error = ServiceError;
71 type Future = ServiceConnecting<C>;
72
73 fn poll_ready(&mut self, ctx: &mut Context) -> Poll<Result<(), Self::Error>> {
74 self.inner.poll_ready(ctx).map_err(ServiceError::inner)
75 }
76
77 fn call(&mut self, uri: Uri) -> Self::Future {
78 let fut = match (&self.resolver, uri.host(), uri.port()) {
79 (Some(resolver), Some(_), None) => ServiceConnectingKind::Preresolve {
80 inner: self.inner.clone(),
81 fut: {
82 let resolver = resolver.clone();
83 Box::pin(async move {
84 let host = uri.host().expect("host was right here, now it is gone");
85 let resolved = resolver.srv_lookup(host).await;
86 (resolved, uri)
87 })
88 },
89 },
90 _ => ServiceConnectingKind::Inner {
91 fut: self.inner.call(uri),
92 },
93 };
94 ServiceConnecting(fut)
95 }
96}
97
98impl<C> ServiceConnector<C> {
99 pub fn new(inner: C, resolver: Option<TokioAsyncResolver>) -> Self {
107 Self { resolver, inner }
108 }
109}
110
111#[derive(Debug)]
112enum ServiceErrorKind {
113 Resolve(Box<ResolveError>),
114 Inner(Box<dyn Error + Send + Sync>),
115}
116
117#[derive(Debug)]
121pub struct ServiceError(ServiceErrorKind);
122
123impl From<ResolveError> for ServiceError {
124 fn from(error: ResolveError) -> Self {
125 Self(ServiceErrorKind::Resolve(Box::new(error)))
126 }
127}
128
129impl fmt::Display for ServiceError {
130 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
131 match &self.0 {
132 ServiceErrorKind::Resolve(err) => fmt::Display::fmt(err, f),
133 ServiceErrorKind::Inner(err) => fmt::Display::fmt(err, f),
134 }
135 }
136}
137
138impl Error for ServiceError {
139 fn source(&self) -> Option<&(dyn Error + 'static)> {
140 match &self.0 {
141 ServiceErrorKind::Resolve(_) => None,
142 ServiceErrorKind::Inner(err) => Some(err.as_ref()),
143 }
144 }
145}
146
147impl ServiceError {
148 fn inner<E>(inner: E) -> Self
149 where
150 E: Into<Box<dyn Error + Send + Sync>>,
151 {
152 Self(ServiceErrorKind::Inner(inner.into()))
153 }
154}
155
156#[allow(clippy::large_enum_variant)]
157enum ServiceConnectingKind<C>
158where
159 C: Service<Uri> + Unpin,
160{
161 Preresolve {
162 inner: C,
163 fut: BoxFuture<'static, (Result<SrvLookup, ResolveError>, Uri)>,
164 },
165 Inner {
166 fut: C::Future,
167 },
168}
169
170impl<C> fmt::Debug for ServiceConnectingKind<C>
171where
172 C: Service<Uri> + Unpin,
173{
174 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
175 f.debug_struct("ServiceConnectingKind").finish()
176 }
177}
178
179#[derive(Debug)]
183pub struct ServiceConnecting<C>(ServiceConnectingKind<C>)
184where
185 C: Service<Uri> + Unpin;
186
187impl<C> Future for ServiceConnecting<C>
188where
189 C: Service<Uri> + Unpin,
190 C::Response: AsyncRead + AsyncWrite + Connection + Unpin + Send + 'static,
191 C::Error: Into<Box<dyn Error + Send + Sync>>,
192 C::Future: Unpin + Send,
193{
194 type Output = Result<C::Response, ServiceError>;
195
196 fn poll(mut self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Self::Output> {
197 match &mut self.0 {
198 ServiceConnectingKind::Preresolve { inner, fut } => {
199 let (res, uri) = ready!(Pin::new(fut).poll(ctx));
200 let response = res.map(Some).or_else(|err| match err.kind() {
201 ResolveErrorKind::NoRecordsFound { .. } => Ok(None),
202 _unexpected => Err(ServiceError(ServiceErrorKind::Resolve(Box::new(err)))),
203 })?;
204 let uri = match response
205 .as_ref()
206 .and_then(|response| response.iter().next())
207 {
208 Some(srv) => {
209 let authority = format!("{}:{}", srv.target(), srv.port());
210 let builder = Uri::builder().authority(authority.as_str());
211 let builder = match uri.scheme() {
212 Some(scheme) => builder.scheme(scheme.clone()),
213 None => builder,
214 };
215 let builder = match uri.path_and_query() {
216 Some(path_and_query) => builder.path_and_query(path_and_query.clone()),
217 None => builder,
218 };
219 builder.build().map_err(ServiceError::inner)?
220 }
221 None => uri,
222 };
223 {
224 *self = Self(ServiceConnectingKind::Inner {
225 fut: inner.call(uri),
226 });
227 }
228 self.poll(ctx)
229 }
230 ServiceConnectingKind::Inner { fut } => {
231 Pin::new(fut).poll(ctx).map_err(ServiceError::inner)
232 }
233 }
234 }
235}