1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
mod auth;
#[cfg(feature = "gzip")] mod compression;
mod headers;
mod log;
mod tls;
mod url;
use self::{log::LogRequest, url::set_cluster_url};
use auth::AuthLayer;
#[cfg(feature = "gzip")] use compression::{accept_compressed, maybe_decompress};
use headers::set_default_headers;
use tls::HttpsConnector;
use std::convert::{TryFrom, TryInto};
use http::{HeaderValue, Request, Response};
use hyper::{Body, Client as HyperClient};
use hyper_timeout::TimeoutConnector;
use tower::{buffer::Buffer, util::BoxService, BoxError, ServiceBuilder};
use crate::{error::ConfigError, Config, Error, Result};
use auth::Authentication;
type InnerService = Buffer<BoxService<Request<Body>, Response<Body>, BoxError>, Request<Body>>;
#[derive(Clone)]
pub struct Service {
inner: InnerService,
}
impl Service {
pub fn new<S>(inner: S) -> Self
where
S: tower::Service<Request<Body>, Response = Response<Body>, Error = BoxError> + Send + 'static,
S::Future: Send + 'static,
{
Self {
inner: Buffer::new(BoxService::new(inner), 1024),
}
}
}
impl tower::Service<Request<Body>> for Service {
type Error = <InnerService as tower::Service<Request<Body>>>::Error;
type Future = <InnerService as tower::Service<Request<Body>>>::Future;
type Response = <InnerService as tower::Service<Request<Body>>>::Response;
fn poll_ready(&mut self, cx: &mut std::task::Context<'_>) -> std::task::Poll<Result<(), Self::Error>> {
self.inner.poll_ready(cx)
}
fn call(&mut self, req: Request<Body>) -> Self::Future {
self.inner.call(req)
}
}
impl TryFrom<Config> for Service {
type Error = Error;
fn try_from(config: Config) -> Result<Self> {
let cluster_url = config.cluster_url.clone();
let mut default_headers = config.headers.clone();
let timeout = config.timeout;
let maybe_auth = match Authentication::try_from(&config.auth_info)? {
Authentication::None => None,
Authentication::Basic(s) => {
let mut value =
HeaderValue::try_from(format!("Basic {}", &s)).map_err(ConfigError::InvalidBasicAuth)?;
value.set_sensitive(true);
default_headers.insert(http::header::AUTHORIZATION, value);
None
}
Authentication::Token(s) => {
let mut value = HeaderValue::try_from(format!("Bearer {}", &s))
.map_err(ConfigError::InvalidBearerToken)?;
value.set_sensitive(true);
default_headers.insert(http::header::AUTHORIZATION, value);
None
}
Authentication::RefreshableToken(r) => Some(AuthLayer::new(r)),
};
let common = ServiceBuilder::new()
.map_request(move |r| set_cluster_url(r, &cluster_url))
.map_request(move |r| set_default_headers(r, default_headers.clone()))
.into_inner();
#[cfg(feature = "gzip")]
let common = ServiceBuilder::new()
.layer(common)
.map_request(accept_compressed)
.map_response(maybe_decompress)
.into_inner();
let https: HttpsConnector<_> = config.try_into()?;
let mut connector = TimeoutConnector::new(https);
if let Some(timeout) = timeout {
connector.set_connect_timeout(Some(timeout));
connector.set_read_timeout(Some(timeout));
}
let client: HyperClient<_, Body> = HyperClient::builder().build(connector);
let inner = ServiceBuilder::new()
.layer(common)
.option_layer(maybe_auth)
.layer(tower::layer::layer_fn(LogRequest::new))
.service(client);
Ok(Self::new(inner))
}
}