1#![warn(unused_extern_crates)]
20pub use http;
21pub use hyper;
22#[macro_use]
23extern crate serde_derive;
24#[macro_use]
25extern crate failure;
26#[macro_use]
27extern crate log;
28
29use bytes::Bytes;
30pub use http::header;
31use http::response::Parts;
32pub use http::Request;
33use hyper::client::connect::HttpConnector;
34use hyper::rt::Future;
35pub use hyper::Body;
36use hyper_rustls::HttpsConnector;
37
38use futures::{future, Poll, Stream};
39use tokio::prelude::FutureExt;
40use tokio::runtime::Runtime;
41
42use crate::cache::{DnsCache, Value};
43use std::sync::{Arc, Mutex};
44
45pub use http::Uri;
46use std::collections::HashMap;
47use std::net::{SocketAddr, IpAddr};
48use std::time::{Duration, Instant};
49
50pub mod cache;
51mod connector;
52pub mod dns;
53pub mod socks5;
54use self::connector::Connector;
55pub use crate::dns::{DnsResolver, RecordType, Resolver};
56
57pub mod errors {
58 pub use failure::{Error, ResultExt};
59 pub type Result<T> = ::std::result::Result<T, Error>;
60}
61pub use crate::errors::*;
62
63#[derive(Debug)]
67pub struct Client<R: DnsResolver> {
68 client: hyper::Client<HttpsConnector<Connector<HttpConnector, R>>>,
69 cache: Arc<Mutex<DnsCache>>,
70}
71
72impl<R: DnsResolver + 'static> Client<R> {
73 pub fn new(resolver: R) -> Client<R> {
77 let connector = Connector::new(resolver);
78 let cache = connector.cache();
79 let https = connector
80 .with_https();
81 let client = hyper::Client::builder()
82 .keep_alive(false)
83 .build::<_, hyper::Body>(https);
84
85 Client {
86 client,
87 cache,
88 }
89 }
90
91 pub fn get(&self, url: &str) -> ResponseFuture {
95 let url = match url.parse::<Uri>() {
96 Ok(url) => url,
97 Err(e) => return ResponseFuture::new(future::err(e.into())),
98 };
99
100 let mut request = Request::builder();
101 let request = match request.uri(url).body(Body::empty()) {
102 Ok(request) => request,
103 Err(e) => return ResponseFuture::new(future::err(e.into())),
104 };
105
106 self.request(request)
107 }
108}
109
110impl Client<Resolver> {
111 pub fn with_system_resolver() -> Result<Client<Resolver>> {
113 let resolver = Resolver::from_system()
114 .context("Failed to load dns configuration")?;
115 Ok(Client::new(resolver))
116 }
117
118 pub fn with_system_resolver_v4() -> Result<Client<Resolver>> {
120 let resolver = Resolver::from_system_v4()
121 .context("Failed to load dns configuration")?;
122 Ok(Client::new(resolver))
123 }
124
125 pub fn with_socks5(proxy: SocketAddr) -> Client<Resolver> {
127 let resolver = Resolver::empty();
128 let connector = Connector::new(resolver);
129 let cache = connector.cache();
130 let https = connector
131 .with_socks5(proxy)
132 .with_https();
133 let client = hyper::Client::builder()
134 .keep_alive(false)
135 .build::<_, hyper::Body>(https);
136
137 Client {
138 client,
139 cache,
140 }
141 }
142}
143
144pub trait HttpClient {
146 fn request(&self, request: Request<hyper::Body>) -> ResponseFuture;
147}
148
149impl<R: DnsResolver + 'static> HttpClient for Client<R> {
150 fn request(&self, request: Request<hyper::Body>) -> ResponseFuture {
151 let client = self.client.clone();
152 let cache = self.cache.clone();
153
154 let uri = request.uri().clone();
155 info!("sending request to {:?}", uri);
156 let fut = client.request(request).map_err(Error::from)
157 .and_then(|res| {
158 debug!("http response: {:?}", res);
159 let (parts, body) = res.into_parts();
160 let body = body.concat2().map_err(Error::from);
161 (future::ok(parts), body)
162 }).map_err(Error::from);
163
164 let reply = fut.and_then(move |(parts, body)| {
165 let ipaddr = {
166 if let Some(host) = uri.host() {
167 let mut cache = cache.lock().unwrap();
168 if let Value::Some(ipaddr) = cache.get(host, Instant::now()) {
169 debug!("adding ip address to response: {}", ipaddr);
170 Some(ipaddr)
171 } else {
172 debug!("no ip address found in cache, this is unexpected");
173 None
174 }
175 } else {
176 None
177 }
178 };
179
180 let body = body.into_bytes();
181 let reply = Response::build(ipaddr, parts, body);
182 info!("got reply {:?}", reply);
183 Ok(reply)
184 });
185
186 ResponseFuture::new(reply)
187 }
188}
189
190#[must_use = "futures do nothing unless polled"]
192pub struct ResponseFuture(Box<dyn Future<Item = Response, Error = Error> + Send>);
193
194impl ResponseFuture {
195 pub(crate) fn new<F>(inner: F) -> Self
197 where
198 F: Future<Item = Response, Error = Error> + Send + 'static,
199 {
200 ResponseFuture(Box::new(inner))
201 }
202
203 pub fn with_timeout(self, timeout: Option<Duration>) -> Self {
205 match timeout {
206 Some(timeout) => {
207 let fut = self.timeout(timeout)
208 .map_err(|err| match err.into_inner() {
209 Some(err) => err,
210 _ => format_err!("Request timed out"),
211 });
212 ResponseFuture(Box::new(fut))
213 },
214 _ => self,
215 }
216 }
217
218 pub fn wait_for_response(self) -> Result<Response> {
220 let mut rt = Runtime::new()?;
221 rt.block_on(self)
222 }
223}
224
225impl Future for ResponseFuture {
226 type Item = Response;
227 type Error = Error;
228
229 fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
230 self.0.poll()
231 }
232}
233
234#[derive(Debug)]
236pub struct Response {
237 pub status: u16,
238 pub headers: HashMap<String, String>,
239 pub cookies: Vec<String>,
240 pub ipaddr: Option<IpAddr>,
241 pub body: Bytes,
242}
243
244impl Response {
245 fn build(ipaddr: Option<IpAddr>, parts: Parts, body: Bytes) -> Response {
246 let cookies = parts
247 .headers
248 .get_all("set-cookie")
249 .into_iter()
250 .flat_map(|x| x.to_str().map(|x| x.to_owned()).ok())
251 .collect();
252
253 let mut headers = HashMap::new();
254
255 for (k, v) in parts.headers {
256 if let Some(k) = k {
257 if let Ok(v) = v.to_str() {
258 let k = String::from(k.as_str());
259 let v = String::from(v);
260
261 headers.insert(k, v);
262 }
263 }
264 }
265
266 Response {
267 status: parts.status.as_u16(),
268 headers,
269 cookies,
270 ipaddr,
271 body,
272 }
273 }
274}
275
276#[cfg(test)]
277mod tests {
278 use super::*;
279 use crate::dns::Resolver;
280 use std::time::{Duration, Instant};
281
282 #[test]
283 #[ignore]
284 fn verify_200_http() {
285 let resolver = Resolver::cloudflare();
286
287 let client = Client::new(resolver);
288 let reply = client
289 .get("http://httpbin.org/anything")
290 .wait_for_response()
291 .expect("request failed");
292 assert_eq!(reply.status, 200);
293 }
294
295 #[test]
296 #[ignore]
297 fn verify_200_https() {
298 let resolver = Resolver::cloudflare();
299
300 let client = Client::new(resolver);
301 let reply = client
302 .get("https://httpbin.org/anything")
303 .wait_for_response()
304 .expect("request failed");
305 assert_eq!(reply.status, 200);
306 }
307
308 #[test]
309 #[ignore]
310 fn verify_200_https_ipaddr() {
311 let resolver = Resolver::cloudflare();
312
313 let client = Client::new(resolver);
314 let reply = client
315 .get("http://1.1.1.1/")
316 .wait_for_response()
317 .expect("request failed");
318 assert_eq!(reply.status, 301);
319 }
320
321 #[test]
322 #[ignore]
323 fn verify_200_https_system_resolver() {
324 let client = Client::with_system_resolver().expect("failed to create client");
325 let reply = client
326 .get("https://httpbin.org/anything")
327 .wait_for_response()
328 .expect("request failed");
329 assert_eq!(reply.status, 200);
330 }
331
332 #[test]
348 #[ignore]
349 fn verify_timeout() {
350 let resolver = Resolver::cloudflare();
351 let client = Client::new(resolver);
352
353 let start = Instant::now();
354 let _reply = client.get("http://1.2.3.4")
355 .with_timeout(Some(Duration::from_millis(250)))
356 .wait_for_response().err();
357 let end = Instant::now();
358
359 assert!(end.duration_since(start) < Duration::from_secs(1));
360 }
361}