tetsy_fetch/
client.rs

1// Copyright 2015-2020 Tetsy Technologies (UK) Ltd.
2// This file is part of Tetsy Vapory.
3
4// Tetsy Vapory is free software: you can redistribute it and/or modify
5// it under the terms of the GNU General Public License as published by
6// the Free Software Foundation, either version 3 of the License, or
7// (at your option) any later version.
8
9// Tetsy Vapory is distributed in the hope that it will be useful,
10// but WITHOUT ANY WARRANTY; without even the implied warranty of
11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12// GNU General Public License for more details.
13
14// You should have received a copy of the GNU General Public License
15// along with Tetsy Vapory.  If not, see <http://www.gnu.org/licenses/>.
16
17use futures::future::{self, Loop};
18use futures::sync::{mpsc, oneshot};
19use futures::{self, Future, Async, Sink, Stream};
20use hyper::header::{self, HeaderMap, HeaderValue, IntoHeaderName};
21use hyper::{self, Method, StatusCode};
22use hyper_rustls;
23use std;
24use std::cmp::min;
25use std::sync::Arc;
26use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
27use std::sync::mpsc::RecvTimeoutError;
28use std::thread;
29use std::time::Duration;
30use std::{io, fmt};
31use tokio::{self, util::FutureExt};
32use url::{self, Url};
33use bytes::Bytes;
34
35const MAX_SIZE: usize = 64 * 1024 * 1024;
36const MAX_SECS: Duration = Duration::from_secs(5);
37const MAX_REDR: usize = 5;
38
39/// A handle to abort requests.
40///
41/// Requests are either aborted based on reaching thresholds such as
42/// maximum response size, timeouts or too many redirects, or else
43/// they can be aborted explicitly by the calling code.
44#[derive(Clone, Debug)]
45pub struct Abort {
46	abort: Arc<AtomicBool>,
47	size: usize,
48	time: Duration,
49	redir: usize,
50}
51
52impl Default for Abort {
53	fn default() -> Abort {
54		Abort {
55			abort: Arc::new(AtomicBool::new(false)),
56			size: MAX_SIZE,
57			time: MAX_SECS,
58			redir: MAX_REDR,
59		}
60	}
61}
62
63impl From<Arc<AtomicBool>> for Abort {
64	fn from(a: Arc<AtomicBool>) -> Abort {
65		Abort {
66			abort: a,
67			size: MAX_SIZE,
68			time: MAX_SECS,
69			redir: MAX_REDR,
70		}
71	}
72}
73
74impl Abort {
75	/// True if `abort` has been invoked.
76	pub fn is_aborted(&self) -> bool {
77		self.abort.load(Ordering::SeqCst)
78	}
79
80	/// The maximum response body size.
81	pub fn max_size(&self) -> usize {
82		self.size
83	}
84
85	/// The maximum total time, including redirects.
86	pub fn max_duration(&self) -> Duration {
87		self.time
88	}
89
90	/// The maximum number of redirects to allow.
91	pub fn max_redirects(&self) -> usize {
92		self.redir
93	}
94
95	/// Mark as aborted.
96	pub fn abort(&self) {
97		self.abort.store(true, Ordering::SeqCst)
98	}
99
100	/// Set the maximum reponse body size.
101	pub fn with_max_size(self, n: usize) -> Abort {
102		Abort { size: n, .. self }
103	}
104
105	/// Set the maximum duration (including redirects).
106	pub fn with_max_duration(self, d: Duration) -> Abort {
107		Abort { time: d, .. self }
108	}
109
110	/// Set the maximum number of redirects to follow.
111	pub fn with_max_redirects(self, n: usize) -> Abort {
112		Abort { redir: n, .. self }
113	}
114}
115
116/// Types which retrieve content from some URL.
117pub trait Fetch: Clone + Send + Sync + 'static {
118	/// The result future.
119	type Result: Future<Item = Response, Error = Error> + Send + 'static;
120
121	/// Make a request to given URL
122	fn fetch(&self, request: Request, abort: Abort) -> Self::Result;
123
124	/// Get content from some URL.
125	fn get(&self, url: &str, abort: Abort) -> Self::Result;
126
127	/// Post content to some URL.
128	fn post(&self, url: &str, abort: Abort) -> Self::Result;
129}
130
131type TxResponse = oneshot::Sender<Result<Response, Error>>;
132type TxStartup = std::sync::mpsc::SyncSender<Result<(), tokio::io::Error>>;
133type ChanItem = Option<(Request, Abort, TxResponse)>;
134
135/// An implementation of `Fetch` using a `hyper` client.
136// Due to the `Send` bound of `Fetch` we spawn a background thread for
137// actual request/response processing as `hyper::Client` itself does
138// not implement `Send` currently.
139#[derive(Debug)]
140pub struct Client {
141	runtime: mpsc::Sender<ChanItem>,
142	refs: Arc<AtomicUsize>,
143}
144
145// When cloning a client we increment the internal reference counter.
146impl Clone for Client {
147	fn clone(&self) -> Client {
148		self.refs.fetch_add(1, Ordering::SeqCst);
149		Client {
150			runtime: self.runtime.clone(),
151			refs: self.refs.clone(),
152		}
153	}
154}
155
156// When dropping a client, we decrement the reference counter.
157// Once it reaches 0 we terminate the background thread.
158impl Drop for Client {
159	fn drop(&mut self) {
160		if self.refs.fetch_sub(1, Ordering::SeqCst) == 1 {
161			// ignore send error as it means the background thread is gone already
162			let _ = self.runtime.clone().send(None).wait();
163		}
164	}
165}
166
167impl Client {
168	/// Create a new fetch client.
169	pub fn new(num_dns_threads: usize) -> Result<Self, Error> {
170		let (tx_start, rx_start) = std::sync::mpsc::sync_channel(1);
171		let (tx_proto, rx_proto) = mpsc::channel(64);
172
173		Client::background_thread(tx_start, rx_proto, num_dns_threads)?;
174
175		match rx_start.recv_timeout(Duration::from_secs(10)) {
176			Err(RecvTimeoutError::Timeout) => {
177				error!(target: "fetch", "timeout starting background thread");
178				return Err(Error::BackgroundThreadDead)
179			}
180			Err(RecvTimeoutError::Disconnected) => {
181				error!(target: "fetch", "background thread gone");
182				return Err(Error::BackgroundThreadDead)
183			}
184			Ok(Err(e)) => {
185				error!(target: "fetch", "error starting background thread: {}", e);
186				return Err(e.into())
187			}
188			Ok(Ok(())) => {}
189		}
190
191		Ok(Client {
192			runtime: tx_proto,
193			refs: Arc::new(AtomicUsize::new(1)),
194		})
195	}
196
197	fn background_thread(tx_start: TxStartup, rx_proto: mpsc::Receiver<ChanItem>, num_dns_threads: usize) -> io::Result<thread::JoinHandle<()>> {
198		thread::Builder::new().name("fetch".into()).spawn(move || {
199			let mut runtime = match tokio::runtime::current_thread::Runtime::new() {
200				Ok(c) => c,
201				Err(e) => return tx_start.send(Err(e)).unwrap_or(())
202			};
203
204			let hyper = hyper::Client::builder()
205				.build(hyper_rustls::HttpsConnector::new(num_dns_threads));
206
207			let future = rx_proto.take_while(|item| Ok(item.is_some()))
208				.map(|item| item.expect("`take_while` is only passing on channel items != None; qed"))
209				.for_each(|(request, abort, sender)|
210			{
211				trace!(target: "fetch", "new request to {}", request.url());
212				if abort.is_aborted() {
213					return future::ok(sender.send(Err(Error::Aborted)).unwrap_or(()))
214				}
215				let ini = (hyper.clone(), request, abort, 0);
216				let fut = future::loop_fn(ini, |(client, request, abort, redirects)| {
217					let request2 = request.clone();
218					let url2 = request2.url().clone();
219					let abort2 = abort.clone();
220					client.request(request.into())
221						.map(move |resp| Response::new(url2, resp, abort2))
222						.from_err()
223						.and_then(move |resp| {
224							if abort.is_aborted() {
225								debug!(target: "fetch", "fetch of {} aborted", request2.url());
226								return Err(Error::Aborted)
227							}
228							if let Some((next_url, preserve_method)) = redirect_location(request2.url().clone(), &resp) {
229								if redirects >= abort.max_redirects() {
230									return Err(Error::TooManyRedirects)
231								}
232								let request = if preserve_method {
233									let mut request2 = request2.clone();
234									request2.set_url(next_url);
235									request2
236								} else {
237									Request::new(next_url, Method::GET)
238								};
239								Ok(Loop::Continue((client, request, abort, redirects + 1)))
240							} else {
241								if let Some(ref h_val) = resp.headers.get(header::CONTENT_LENGTH) {
242									let content_len = h_val
243										.to_str()?
244										.parse::<u64>()?;
245
246									if content_len > abort.max_size() as u64 {
247										return Err(Error::SizeLimit)
248									}
249								}
250								Ok(Loop::Break(resp))
251							}
252						})
253					})
254					.then(|result| {
255						future::ok(sender.send(result).unwrap_or(()))
256					});
257				tokio::spawn(fut);
258				trace!(target: "fetch", "waiting for next request ...");
259				future::ok(())
260			});
261
262			tx_start.send(Ok(())).unwrap_or(());
263
264			debug!(target: "fetch", "processing requests ...");
265			if let Err(()) = runtime.block_on(future) {
266				error!(target: "fetch", "error while executing future")
267			}
268			debug!(target: "fetch", "fetch background thread finished")
269		})
270	}
271}
272
273impl Fetch for Client {
274	type Result = Box<dyn Future<Item=Response, Error=Error> + Send + 'static>;
275
276	fn fetch(&self, request: Request, abort: Abort) -> Self::Result {
277		debug!(target: "fetch", "fetching: {:?}", request.url());
278		if abort.is_aborted() {
279			return Box::new(future::err(Error::Aborted))
280		}
281		let (tx_res, rx_res) = oneshot::channel();
282		let maxdur = abort.max_duration();
283		let sender = self.runtime.clone();
284		let future = sender.send(Some((request, abort, tx_res)))
285			.map_err(|e| {
286				error!(target: "fetch", "failed to schedule request: {}", e);
287				Error::BackgroundThreadDead
288			})
289			.and_then(|_| rx_res.map_err(|oneshot::Canceled| Error::BackgroundThreadDead))
290			.and_then(future::result);
291
292		Box::new(future.timeout(maxdur)
293			.map_err(|err| {
294				if err.is_inner() {
295					Error::from(err.into_inner().unwrap())
296				} else {
297					Error::from(err)
298				}
299			})
300		)
301	}
302
303	/// Get content from some URL.
304	fn get(&self, url: &str, abort: Abort) -> Self::Result {
305		let url: Url = match url.parse() {
306			Ok(u) => u,
307			Err(e) => return Box::new(future::err(e.into()))
308		};
309		self.fetch(Request::get(url), abort)
310	}
311
312	/// Post content to some URL.
313	fn post(&self, url: &str, abort: Abort) -> Self::Result {
314		let url: Url = match url.parse() {
315			Ok(u) => u,
316			Err(e) => return Box::new(future::err(e.into()))
317		};
318		self.fetch(Request::post(url), abort)
319	}
320}
321
322// Extract redirect location from response. The second return value indicate whether the original method should be preserved.
323fn redirect_location(u: Url, r: &Response) -> Option<(Url, bool)> {
324	let preserve_method = match r.status() {
325		StatusCode::TEMPORARY_REDIRECT | StatusCode::PERMANENT_REDIRECT => true,
326		_ => false,
327	};
328	match r.status() {
329		StatusCode::MOVED_PERMANENTLY
330		| StatusCode::PERMANENT_REDIRECT
331		| StatusCode::TEMPORARY_REDIRECT
332		| StatusCode::FOUND
333		| StatusCode::SEE_OTHER => {
334			r.headers.get(header::LOCATION).and_then(|loc| {
335				loc.to_str().ok().and_then(|loc_s| {
336					u.join(loc_s).ok().map(|url| (url, preserve_method))
337				})
338			})
339		}
340		_ => None
341	}
342}
343
344/// A wrapper for hyper::Request using Url and with methods.
345#[derive(Debug, Clone)]
346pub struct Request {
347	url: Url,
348	method: Method,
349	headers: HeaderMap,
350	body: Bytes,
351}
352
353impl Request {
354	/// Create a new request, with given url and method.
355	pub fn new(url: Url, method: Method) -> Request {
356		Request {
357			url, method,
358			headers: HeaderMap::new(),
359			body: Default::default(),
360		}
361	}
362
363	/// Create a new GET request.
364	pub fn get(url: Url) -> Request {
365		Request::new(url, Method::GET)
366	}
367
368	/// Create a new empty POST request.
369	pub fn post(url: Url) -> Request {
370		Request::new(url, Method::POST)
371	}
372
373	/// Read the url.
374	pub fn url(&self) -> &Url {
375		&self.url
376	}
377
378	/// Read the request headers.
379	pub fn headers(&self) -> &HeaderMap {
380		&self.headers
381	}
382
383	/// Get a mutable reference to the headers.
384	pub fn headers_mut(&mut self) -> &mut HeaderMap {
385		&mut self.headers
386	}
387
388	/// Set the body of the request.
389	pub fn set_body<T: Into<Bytes>>(&mut self, body: T) {
390		self.body = body.into();
391	}
392
393	/// Set the url of the request.
394	pub fn set_url(&mut self, url: Url) {
395		self.url = url;
396	}
397
398	/// Consume self, and return it with the added given header.
399	pub fn with_header<K>(mut self, key: K, val: HeaderValue) -> Self
400		where K: IntoHeaderName,
401	{
402		self.headers_mut().append(key, val);
403		self
404	}
405
406	/// Consume self, and return it with the body.
407	pub fn with_body<T: Into<Bytes>>(mut self, body: T) -> Self {
408		self.set_body(body);
409		self
410	}
411}
412
413impl From<Request> for hyper::Request<hyper::Body> {
414	fn from(req: Request) -> hyper::Request<hyper::Body> {
415		let uri: hyper::Uri = req.url.as_ref().parse().expect("Every valid URLis also a URI.");
416		hyper::Request::builder()
417			.method(req.method)
418			.uri(uri)
419			.header(header::USER_AGENT, HeaderValue::from_static("Tetsy Fetch Neo"))
420			.body(req.body.into())
421			.expect("Header, uri, method, and body are already valid and can not fail to parse; qed")
422	}
423}
424
425/// An HTTP response.
426#[derive(Debug)]
427pub struct Response {
428	url: Url,
429	status: StatusCode,
430	headers: HeaderMap,
431	body: hyper::Body,
432	abort: Abort,
433	nread: usize,
434}
435
436impl Response {
437	/// Create a new response, wrapping a hyper response.
438	pub fn new(u: Url, r: hyper::Response<hyper::Body>, a: Abort) -> Response {
439		Response {
440			url: u,
441			status: r.status(),
442			headers: r.headers().clone(),
443			body: r.into_body(),
444			abort: a,
445			nread: 0,
446		}
447	}
448
449	/// The response status.
450	pub fn status(&self) -> StatusCode {
451		self.status
452	}
453
454	/// Status code == OK (200)?
455	pub fn is_success(&self) -> bool {
456		self.status() == StatusCode::OK
457	}
458
459	/// Status code == 404.
460	pub fn is_not_found(&self) -> bool {
461		self.status() == StatusCode::NOT_FOUND
462	}
463
464	/// Is the content-type text/html?
465	pub fn is_html(&self) -> bool {
466		self.headers.get(header::CONTENT_TYPE).and_then(|ct_val| {
467			ct_val.to_str().ok().map(|ct_str| {
468				ct_str.contains("text") && ct_str.contains("html")
469			})
470		}).unwrap_or(false)
471	}
472}
473
474impl Stream for Response {
475	type Item = hyper::Chunk;
476	type Error = Error;
477
478	fn poll(&mut self) -> futures::Poll<Option<Self::Item>, Self::Error> {
479		if self.abort.is_aborted() {
480			debug!(target: "fetch", "fetch of {} aborted", self.url);
481			return Err(Error::Aborted)
482		}
483		match try_ready!(self.body.poll()) {
484			None => Ok(Async::Ready(None)),
485			Some(c) => {
486				if self.nread + c.len() > self.abort.max_size() {
487					debug!(target: "fetch", "size limit {:?} for {} exceeded", self.abort.max_size(), self.url);
488					return Err(Error::SizeLimit)
489				}
490				self.nread += c.len();
491				Ok(Async::Ready(Some(c)))
492			}
493		}
494	}
495}
496
497/// `BodyReader` serves as an adapter from async to sync I/O.
498///
499/// It implements `io::Read` by repedately waiting for the next `Chunk`
500/// of hyper's response `Body` which blocks the current thread.
501pub struct BodyReader {
502	chunk: hyper::Chunk,
503	body: Option<hyper::Body>,
504	abort: Abort,
505	offset: usize,
506	count: usize,
507}
508
509impl BodyReader {
510	/// Create a new body reader for the given response.
511	pub fn new(r: Response) -> BodyReader {
512		BodyReader {
513			body: Some(r.body),
514			chunk: Default::default(),
515			abort: r.abort,
516			offset: 0,
517			count: 0,
518		}
519	}
520}
521
522impl io::Read for BodyReader {
523	fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
524		let mut n = 0;
525		while self.body.is_some() {
526			// Can we still read from the current chunk?
527			if self.offset < self.chunk.len() {
528				let k = min(self.chunk.len() - self.offset, buf.len() - n);
529				if self.count + k > self.abort.max_size() {
530					debug!(target: "fetch", "size limit {:?} exceeded", self.abort.max_size());
531					return Err(io::Error::new(io::ErrorKind::PermissionDenied, "size limit exceeded"))
532				}
533				let c = &self.chunk[self.offset .. self.offset + k];
534				(&mut buf[n .. n + k]).copy_from_slice(c);
535				self.offset += k;
536				self.count += k;
537				n += k;
538				if n == buf.len() {
539					break
540				}
541			} else {
542				let body = self.body.take().expect("loop condition ensures `self.body` is always defined; qed");
543				match body.into_future().wait() { // wait for next chunk
544					Err((e, _)) => {
545						error!(target: "fetch", "failed to read chunk: {}", e);
546						return Err(io::Error::new(io::ErrorKind::Other, "failed to read body chunk"))
547					}
548					Ok((None, _)) => break, // body is exhausted, break out of the loop
549					Ok((Some(c), b)) => {
550						self.body = Some(b);
551						self.chunk = c;
552						self.offset = 0
553					}
554				}
555			}
556		}
557		Ok(n)
558	}
559}
560
561/// Fetch error cases.
562#[derive(Debug)]
563pub enum Error {
564	/// Hyper gave us an error.
565	Hyper(hyper::Error),
566	/// A hyper header conversion error.
567	HyperHeaderToStrError(hyper::header::ToStrError),
568	/// An integer parsing error.
569	ParseInt(std::num::ParseIntError),
570	/// Some I/O error occured.
571	Io(io::Error),
572	/// Invalid URLs where attempted to parse.
573	Url(url::ParseError),
574	/// Calling code invoked `Abort::abort`.
575	Aborted,
576	/// Too many redirects have been encountered.
577	TooManyRedirects,
578	/// tokio-timer inner future gave us an error.
579	TokioTimeoutInnerVal(String),
580	/// tokio-timer gave us an error.
581	TokioTimer(Option<tokio::timer::Error>),
582	/// The maximum duration was reached.
583	Timeout,
584	/// The response body is too large.
585	SizeLimit,
586	/// The background processing thread does not run.
587	BackgroundThreadDead,
588}
589
590impl fmt::Display for Error {
591	fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
592		match *self {
593			Error::Aborted => write!(fmt, "The request has been aborted."),
594			Error::Hyper(ref e) => write!(fmt, "{}", e),
595			Error::HyperHeaderToStrError(ref e) => write!(fmt, "{}", e),
596			Error::ParseInt(ref e) => write!(fmt, "{}", e),
597			Error::Url(ref e) => write!(fmt, "{}", e),
598			Error::Io(ref e) => write!(fmt, "{}", e),
599			Error::BackgroundThreadDead => write!(fmt, "background thread gond"),
600			Error::TooManyRedirects => write!(fmt, "too many redirects"),
601			Error::TokioTimeoutInnerVal(ref s) => write!(fmt, "tokio timer inner value error: {:?}", s),
602			Error::TokioTimer(ref e) => write!(fmt, "tokio timer error: {:?}", e),
603			Error::Timeout => write!(fmt, "request timed out"),
604			Error::SizeLimit => write!(fmt, "size limit reached"),
605		}
606	}
607}
608
609impl ::std::error::Error for Error {
610	fn description(&self) -> &str { "Fetch client error" }
611	fn cause(&self) -> Option<&dyn std::error::Error> { None }
612}
613
614impl From<hyper::Error> for Error {
615	fn from(e: hyper::Error) -> Self {
616		Error::Hyper(e)
617	}
618}
619
620impl From<hyper::header::ToStrError> for Error {
621	fn from(e: hyper::header::ToStrError) -> Self {
622		Error::HyperHeaderToStrError(e)
623	}
624}
625
626impl From<std::num::ParseIntError> for Error {
627	fn from(e: std::num::ParseIntError) -> Self {
628		Error::ParseInt(e)
629	}
630}
631
632impl From<io::Error> for Error {
633	fn from(e: io::Error) -> Self {
634		Error::Io(e)
635	}
636}
637
638impl From<url::ParseError> for Error {
639	fn from(e: url::ParseError) -> Self {
640		Error::Url(e)
641	}
642}
643
644impl<T: std::fmt::Debug> From<tokio::timer::timeout::Error<T>> for Error {
645	fn from(e: tokio::timer::timeout::Error<T>) -> Self {
646		if e.is_inner() {
647			Error::TokioTimeoutInnerVal(format!("{:?}", e.into_inner().unwrap()))
648		} else if e.is_elapsed() {
649			Error::Timeout
650		} else {
651			Error::TokioTimer(e.into_timer())
652		}
653	}
654}
655
656impl From<tokio::timer::Error> for Error {
657	fn from(e: tokio::timer::Error) -> Self {
658		Error::TokioTimer(Some(e))
659	}
660}
661
662#[cfg(test)]
663mod test {
664	use super::*;
665	use futures::future;
666	use futures::sync::oneshot;
667	use hyper::{
668		StatusCode,
669		service::Service,
670	};
671	use tokio::timer::Delay;
672	use tokio::runtime::current_thread::Runtime;
673	use std::io::Read;
674	use std::net::SocketAddr;
675
676	const ADDRESS: &str = "127.0.0.1:0";
677
678	#[test]
679	fn it_should_fetch() {
680		let server = TestServer::run();
681		let client = Client::new(4).unwrap();
682		let mut runtime = Runtime::new().unwrap();
683
684		let future = client.get(&format!("http://{}?123", server.addr()), Abort::default())
685			.map(|resp| {
686				assert!(resp.is_success());
687				resp
688			})
689			.map(|resp| resp.concat2())
690			.flatten()
691			.map(|body| assert_eq!(&body[..], b"123"))
692			.map_err(|err| panic!(err));
693
694		runtime.block_on(future).unwrap();
695	}
696
697	#[test]
698	fn it_should_fetch_in_light_mode() {
699		let server = TestServer::run();
700		let client = Client::new(1).unwrap();
701		let mut runtime = Runtime::new().unwrap();
702
703		let future = client.get(&format!("http://{}?123", server.addr()), Abort::default())
704			.map(|resp| {
705				assert!(resp.is_success());
706				resp
707			})
708			.map(|resp| resp.concat2())
709			.flatten()
710			.map(|body| assert_eq!(&body[..], b"123"))
711			.map_err(|err| panic!(err));
712
713		runtime.block_on(future).unwrap();
714	}
715
716	#[test]
717	fn it_should_timeout() {
718		let server = TestServer::run();
719		let client = Client::new(4).unwrap();
720		let mut runtime = Runtime::new().unwrap();
721
722		let abort = Abort::default().with_max_duration(Duration::from_secs(1));
723
724		let future = client.get(&format!("http://{}/delay?3", server.addr()), abort)
725			.then(|res| {
726				match res {
727					Err(Error::Timeout) => Ok::<_, ()>(()),
728					other => panic!("expected timeout, got {:?}", other),
729				}
730			});
731
732		runtime.block_on(future).unwrap();
733	}
734
735	#[test]
736	fn it_should_follow_redirects() {
737		let server = TestServer::run();
738		let client = Client::new(4).unwrap();
739		let mut runtime = Runtime::new().unwrap();
740
741		let abort = Abort::default();
742
743		let future = client.get(&format!("http://{}/redirect?http://{}/", server.addr(), server.addr()), abort)
744			.and_then(|resp| {
745				if resp.is_success() { Ok(()) } else { panic!("Response unsuccessful") }
746			});
747
748		runtime.block_on(future).unwrap();
749	}
750
751	#[test]
752	fn it_should_follow_relative_redirects() {
753		let server = TestServer::run();
754		let client = Client::new(4).unwrap();
755		let mut runtime = Runtime::new().unwrap();
756
757		let abort = Abort::default().with_max_redirects(4);
758		let future = client.get(&format!("http://{}/redirect?/", server.addr()), abort)
759			.and_then(|resp| {
760				if resp.is_success() { Ok(()) } else { panic!("Response unsuccessful") }
761			});
762
763		runtime.block_on(future).unwrap();
764	}
765
766	#[test]
767	fn it_should_not_follow_too_many_redirects() {
768		let server = TestServer::run();
769		let client = Client::new(4).unwrap();
770		let mut runtime = Runtime::new().unwrap();
771
772		let abort = Abort::default().with_max_redirects(3);
773		let future = client.get(&format!("http://{}/loop", server.addr()), abort)
774			.then(|res| {
775				match res {
776					Err(Error::TooManyRedirects) => Ok::<_, ()>(()),
777					other => panic!("expected too many redirects error, got {:?}", other)
778				}
779			});
780
781		runtime.block_on(future).unwrap();
782	}
783
784	#[test]
785	fn it_should_read_data() {
786		let server = TestServer::run();
787		let client = Client::new(4).unwrap();
788		let mut runtime = Runtime::new().unwrap();
789
790		let abort = Abort::default();
791		let future = client.get(&format!("http://{}?abcdefghijklmnopqrstuvwxyz", server.addr()), abort)
792			.and_then(|resp| {
793				if resp.is_success() { Ok(resp) } else { panic!("Response unsuccessful") }
794			})
795			.map(|resp| resp.concat2())
796			.flatten()
797			.map(|body| assert_eq!(&body[..], b"abcdefghijklmnopqrstuvwxyz"));
798
799		runtime.block_on(future).unwrap();
800	}
801
802	#[test]
803	fn it_should_not_read_too_much_data() {
804		let server = TestServer::run();
805		let client = Client::new(4).unwrap();
806		let mut runtime = Runtime::new().unwrap();
807
808		let abort = Abort::default().with_max_size(3);
809		let future = client.get(&format!("http://{}/?1234", server.addr()), abort)
810			.and_then(|resp| {
811				if resp.is_success() { Ok(resp) } else { panic!("Response unsuccessful") }
812			})
813			.map(|resp| resp.concat2())
814			.flatten()
815			.then(|body| {
816				match body {
817					Err(Error::SizeLimit) => Ok::<_, ()>(()),
818					other => panic!("expected size limit error, got {:?}", other),
819				}
820			});
821
822		runtime.block_on(future).unwrap();
823	}
824
825	#[test]
826	fn it_should_not_read_too_much_data_sync() {
827		let server = TestServer::run();
828		let client = Client::new(4).unwrap();
829		let mut runtime = Runtime::new().unwrap();
830
831		// let abort = Abort::default().with_max_size(3);
832		// let resp = client.get(&format!("http://{}/?1234", server.addr()), abort).wait().unwrap();
833		// assert!(resp.is_success());
834		// let mut buffer = Vec::new();
835		// let mut reader = BodyReader::new(resp);
836		// match reader.read_to_end(&mut buffer) {
837		// 	Err(ref e) if e.kind() == io::ErrorKind::PermissionDenied => {}
838		// 	other => panic!("expected size limit error, got {:?}", other)
839		// }
840
841		// FIXME (c0gent): The prior version of this test (pre-hyper-0.12,
842		// commented out above) is not possible to recreate. It relied on an
843		// apparent bug in `Client::background_thread` which suppressed the
844		// `SizeLimit` error from occurring. This is due to the headers
845		// collection not returning a value for content length when queried.
846		// The precise reason why this was happening is unclear.
847
848		let abort = Abort::default().with_max_size(3);
849		let future = client.get(&format!("http://{}/?1234", server.addr()), abort)
850			.and_then(|resp| {
851				assert_eq!(true, false, "Unreachable. (see FIXME note)");
852				assert!(resp.is_success());
853				let mut buffer = Vec::new();
854				let mut reader = BodyReader::new(resp);
855				match reader.read_to_end(&mut buffer) {
856					Err(ref e) if e.kind() == io::ErrorKind::PermissionDenied => Ok(()),
857					other => panic!("expected size limit error, got {:?}", other)
858				}
859			});
860
861		// FIXME: This simply demonstrates the above point.
862		match runtime.block_on(future) {
863			Err(Error::SizeLimit) => {},
864			other => panic!("Expected `Error::SizeLimit`, got: {:?}", other),
865		}
866	}
867
868	struct TestServer;
869
870	impl Service for TestServer {
871		type ReqBody = hyper::Body;
872		type ResBody = hyper::Body;
873		type Error = Error;
874		type Future = Box<dyn Future<Item=hyper::Response<Self::ResBody>, Error=Self::Error> + Send + 'static>;
875
876		fn call(&mut self, req: hyper::Request<hyper::Body>) -> Self::Future {
877			match req.uri().path() {
878				"/" => {
879					let body = req.uri().query().unwrap_or("").to_string();
880					let res = hyper::Response::new(body.into());
881					Box::new(future::ok(res))
882				}
883				"/redirect" => {
884					let loc = req.uri().query().unwrap_or("/").to_string();
885					let res = hyper::Response::builder()
886						.status(StatusCode::MOVED_PERMANENTLY)
887						.header(hyper::header::LOCATION, loc)
888						.body(hyper::Body::empty())
889						.expect("Unable to create response");
890					Box::new(future::ok(res))
891				}
892				"/loop" => {
893					let res = hyper::Response::builder()
894						.status(StatusCode::MOVED_PERMANENTLY)
895						.header(hyper::header::LOCATION, "/loop")
896						.body(hyper::Body::empty())
897						.expect("Unable to create response");
898					Box::new(future::ok(res))
899				}
900				"/delay" => {
901					let dur = Duration::from_secs(req.uri().query().unwrap_or("0").parse().unwrap());
902					let delayed_res = Delay::new(std::time::Instant::now() + dur)
903						.and_then(|_| Ok::<_, _>(hyper::Response::new(hyper::Body::empty())))
904						.from_err();
905					Box::new(delayed_res)
906				}
907				_ => {
908					let res = hyper::Response::builder()
909						.status(StatusCode::NOT_FOUND)
910						.body(hyper::Body::empty())
911						.expect("Unable to create response");
912					Box::new(future::ok(res))
913				}
914			}
915		}
916	}
917
918	impl TestServer {
919		fn run() -> Handle {
920			let (tx_start, rx_start) = std::sync::mpsc::sync_channel(1);
921			let (tx_end, rx_end) = oneshot::channel();
922			let rx_end_fut = rx_end.map(|_| ()).map_err(|_| ());
923			thread::spawn(move || {
924				let addr = ADDRESS.parse().unwrap();
925
926				let server = hyper::server::Server::bind(&addr)
927					.serve(|| future::ok::<_, hyper::Error>(TestServer));
928
929				tx_start.send(server.local_addr()).unwrap_or(());
930
931				tokio::run(
932					server.with_graceful_shutdown(rx_end_fut)
933						.map_err(|e| panic!("server error: {}", e))
934				);
935			});
936
937			Handle(rx_start.recv().unwrap(), Some(tx_end))
938		}
939	}
940
941	struct Handle(SocketAddr, Option<oneshot::Sender<()>>);
942
943	impl Handle {
944		fn addr(&self) -> SocketAddr {
945			self.0
946		}
947	}
948
949	impl Drop for Handle {
950		fn drop(&mut self) {
951			self.1.take().unwrap().send(()).unwrap();
952		}
953	}
954}