1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
use std::ops::Deref;
use std::sync::Arc;
use std::time::Duration;
use futures::{future, Stream};
use futures::future::Future;
pub use hyper::{Client as HyperClient, client::Builder as HyperBuilder};
use hyper::client::HttpConnector;
use hyper_rustls::HttpsConnector;
use rustls::ClientConfig as TlsConfig;
use tokio::runtime::Runtime;
use tokio::timer::Timeout;
use crate::body::IngestBody;
use crate::error::HttpError;
use crate::request::RequestTemplate;
use crate::response::{IngestResponse, Response};
pub struct Client {
hyper: Arc<HyperClient<HttpsConnector<HttpConnector>>>,
template: RequestTemplate,
timeout: Duration,
}
impl Client {
pub fn new(template: RequestTemplate, runtime: &mut Runtime) -> Self {
let exec = runtime.executor();
let reactor = runtime.reactor().clone();
let http_connector = {
let mut connector = HttpConnector::new_with_executor(
exec, Some(reactor),
);
connector.enforce_http(false);
connector
};
let tls_config = {
let mut cfg = TlsConfig::new();
cfg.root_store.add_server_trust_anchors(&webpki_roots::TLS_SERVER_ROOTS);
cfg.ct_logs = Some(&ct_logs::LOGS);
cfg
};
let https_connector = HttpsConnector::from((http_connector, tls_config));
Client {
hyper: Arc::new(
HyperClient::builder()
.max_idle_per_host(20)
.build(https_connector)
),
template,
timeout: Duration::from_secs(5),
}
}
pub fn set_timeout(&mut self, timeout: Duration) {
self.timeout = timeout
}
pub fn send<T>(&self, body: T) -> IngestResponse<T>
where T: Deref<Target=IngestBody> + Send + 'static,
T: Clone,
{
let hyper = self.hyper.clone();
let tmp_body = body.clone();
let tmp_body1 = body.clone();
let timeout = self.timeout.clone();
Box::new(
self.template.new_request(body.clone())
.map_err(HttpError::from)
.and_then(move |req|
Timeout::new(
hyper.request(req)
.map_err(move |e| HttpError::Send(body, e)),
timeout,
).map_err(move |e| {
match e.into_inner() {
Some(e) => e,
None => HttpError::Timeout(tmp_body),
}
})
)
.and_then(|res| {
let status = res.status();
res.into_body()
.map_err(Into::into)
.fold(Vec::new(), |mut vec, chunk| {
vec.extend_from_slice(&*chunk);
future::ok::<_, HttpError<T>>(vec)
})
.and_then(|body| String::from_utf8(body).map_err(Into::into))
.map(move |reason| (status, reason))
})
.map(move |(status, reason)| {
if status.as_u16() < 200 || status.as_u16() >= 300 {
Response::Failed(tmp_body1, status, reason)
} else {
Response::Sent
}
})
)
}
}