1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
use std::sync::Arc;
use futures::future;
use futures::future::Future;
use futures::stream::Stream;
use hyper::Client as HyperClient;
use hyper::client::HttpConnector;
use hyper_rustls::HttpsConnector;
use rustls::ClientConfig as TlsConfig;
use tokio::runtime::Runtime;
use crate::body::IngestBody;
use crate::error::HttpError;
use crate::request::RequestTemplate;
use crate::response::{IngestResponse, Response};
pub struct Client {
hyper: Arc<HyperClient<HttpsConnector<HttpConnector>>>,
template: RequestTemplate,
}
impl Client {
pub fn new(template: RequestTemplate, runtime: &mut Runtime) -> Self {
let exec = runtime.executor();
let reactor = runtime.reactor().clone();
let http_connector = {
let mut connector = HttpConnector::new_with_executor(
exec, Some(reactor),
);
connector.enforce_http(false);
connector
};
let tls_config = {
let mut cfg = TlsConfig::new();
cfg.root_store.add_server_trust_anchors(&webpki_roots::TLS_SERVER_ROOTS);
cfg.ct_logs = Some(&ct_logs::LOGS);
cfg
};
let https_connector = HttpsConnector::from((http_connector, tls_config));
Client {
hyper: Arc::new(HyperClient::builder().build(https_connector)),
template,
}
}
pub fn send(&self, body: IngestBody) -> IngestResponse {
let hyper = self.hyper.clone();
let body = Arc::new(body);
let tmp_body = body.clone();
Box::new(
self.template.new_request(body.clone())
.map_err(HttpError::from)
.and_then(move |req|
hyper.request(req)
.map_err(move |e| HttpError::Send(body, e))
)
.and_then(|res| {
let status = res.status();
res.into_body()
.map_err(Into::into)
.fold(Vec::new(), |mut vec, chunk| {
vec.extend_from_slice(&*chunk);
future::ok::<_, HttpError>(vec)
})
.and_then(|body| String::from_utf8(body).map_err(Into::into))
.map(move |reason| (status, reason))
})
.map(move |(status, reason)| {
if status.as_u16() < 200 || status.as_u16() >= 300 {
Response::Failed(tmp_body, status, reason)
} else {
Response::Sent
}
})
)
}
}