1use futures::future::{self, Loop};
18use futures::sync::{mpsc, oneshot};
19use futures::{self, Future, Async, Sink, Stream};
20use hyper::header::{self, HeaderMap, HeaderValue, IntoHeaderName};
21use hyper::{self, Method, StatusCode};
22use hyper_rustls;
23use std;
24use std::cmp::min;
25use std::sync::Arc;
26use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
27use std::sync::mpsc::RecvTimeoutError;
28use std::thread;
29use std::time::Duration;
30use std::{io, fmt};
31use tokio::{self, util::FutureExt};
32use url::{self, Url};
33use bytes::Bytes;
34
35const MAX_SIZE: usize = 64 * 1024 * 1024;
36const MAX_SECS: Duration = Duration::from_secs(5);
37const MAX_REDR: usize = 5;
38
39#[derive(Clone, Debug)]
45pub struct Abort {
46 abort: Arc<AtomicBool>,
47 size: usize,
48 time: Duration,
49 redir: usize,
50}
51
52impl Default for Abort {
53 fn default() -> Abort {
54 Abort {
55 abort: Arc::new(AtomicBool::new(false)),
56 size: MAX_SIZE,
57 time: MAX_SECS,
58 redir: MAX_REDR,
59 }
60 }
61}
62
63impl From<Arc<AtomicBool>> for Abort {
64 fn from(a: Arc<AtomicBool>) -> Abort {
65 Abort {
66 abort: a,
67 size: MAX_SIZE,
68 time: MAX_SECS,
69 redir: MAX_REDR,
70 }
71 }
72}
73
74impl Abort {
75 pub fn is_aborted(&self) -> bool {
77 self.abort.load(Ordering::SeqCst)
78 }
79
80 pub fn max_size(&self) -> usize {
82 self.size
83 }
84
85 pub fn max_duration(&self) -> Duration {
87 self.time
88 }
89
90 pub fn max_redirects(&self) -> usize {
92 self.redir
93 }
94
95 pub fn abort(&self) {
97 self.abort.store(true, Ordering::SeqCst)
98 }
99
100 pub fn with_max_size(self, n: usize) -> Abort {
102 Abort { size: n, .. self }
103 }
104
105 pub fn with_max_duration(self, d: Duration) -> Abort {
107 Abort { time: d, .. self }
108 }
109
110 pub fn with_max_redirects(self, n: usize) -> Abort {
112 Abort { redir: n, .. self }
113 }
114}
115
116pub trait Fetch: Clone + Send + Sync + 'static {
118 type Result: Future<Item = Response, Error = Error> + Send + 'static;
120
121 fn fetch(&self, request: Request, abort: Abort) -> Self::Result;
123
124 fn get(&self, url: &str, abort: Abort) -> Self::Result;
126
127 fn post(&self, url: &str, abort: Abort) -> Self::Result;
129}
130
131type TxResponse = oneshot::Sender<Result<Response, Error>>;
132type TxStartup = std::sync::mpsc::SyncSender<Result<(), tokio::io::Error>>;
133type ChanItem = Option<(Request, Abort, TxResponse)>;
134
135#[derive(Debug)]
140pub struct Client {
141 runtime: mpsc::Sender<ChanItem>,
142 refs: Arc<AtomicUsize>,
143}
144
145impl Clone for Client {
147 fn clone(&self) -> Client {
148 self.refs.fetch_add(1, Ordering::SeqCst);
149 Client {
150 runtime: self.runtime.clone(),
151 refs: self.refs.clone(),
152 }
153 }
154}
155
156impl Drop for Client {
159 fn drop(&mut self) {
160 if self.refs.fetch_sub(1, Ordering::SeqCst) == 1 {
161 let _ = self.runtime.clone().send(None).wait();
163 }
164 }
165}
166
167impl Client {
168 pub fn new(num_dns_threads: usize) -> Result<Self, Error> {
170 let (tx_start, rx_start) = std::sync::mpsc::sync_channel(1);
171 let (tx_proto, rx_proto) = mpsc::channel(64);
172
173 Client::background_thread(tx_start, rx_proto, num_dns_threads)?;
174
175 match rx_start.recv_timeout(Duration::from_secs(10)) {
176 Err(RecvTimeoutError::Timeout) => {
177 error!(target: "fetch", "timeout starting background thread");
178 return Err(Error::BackgroundThreadDead)
179 }
180 Err(RecvTimeoutError::Disconnected) => {
181 error!(target: "fetch", "background thread gone");
182 return Err(Error::BackgroundThreadDead)
183 }
184 Ok(Err(e)) => {
185 error!(target: "fetch", "error starting background thread: {}", e);
186 return Err(e.into())
187 }
188 Ok(Ok(())) => {}
189 }
190
191 Ok(Client {
192 runtime: tx_proto,
193 refs: Arc::new(AtomicUsize::new(1)),
194 })
195 }
196
197 fn background_thread(tx_start: TxStartup, rx_proto: mpsc::Receiver<ChanItem>, num_dns_threads: usize) -> io::Result<thread::JoinHandle<()>> {
198 thread::Builder::new().name("fetch".into()).spawn(move || {
199 let mut runtime = match tokio::runtime::current_thread::Runtime::new() {
200 Ok(c) => c,
201 Err(e) => return tx_start.send(Err(e)).unwrap_or(())
202 };
203
204 let hyper = hyper::Client::builder()
205 .build(hyper_rustls::HttpsConnector::new(num_dns_threads));
206
207 let future = rx_proto.take_while(|item| Ok(item.is_some()))
208 .map(|item| item.expect("`take_while` is only passing on channel items != None; qed"))
209 .for_each(|(request, abort, sender)|
210 {
211 trace!(target: "fetch", "new request to {}", request.url());
212 if abort.is_aborted() {
213 return future::ok(sender.send(Err(Error::Aborted)).unwrap_or(()))
214 }
215 let ini = (hyper.clone(), request, abort, 0);
216 let fut = future::loop_fn(ini, |(client, request, abort, redirects)| {
217 let request2 = request.clone();
218 let url2 = request2.url().clone();
219 let abort2 = abort.clone();
220 client.request(request.into())
221 .map(move |resp| Response::new(url2, resp, abort2))
222 .from_err()
223 .and_then(move |resp| {
224 if abort.is_aborted() {
225 debug!(target: "fetch", "fetch of {} aborted", request2.url());
226 return Err(Error::Aborted)
227 }
228 if let Some((next_url, preserve_method)) = redirect_location(request2.url().clone(), &resp) {
229 if redirects >= abort.max_redirects() {
230 return Err(Error::TooManyRedirects)
231 }
232 let request = if preserve_method {
233 let mut request2 = request2.clone();
234 request2.set_url(next_url);
235 request2
236 } else {
237 Request::new(next_url, Method::GET)
238 };
239 Ok(Loop::Continue((client, request, abort, redirects + 1)))
240 } else {
241 if let Some(ref h_val) = resp.headers.get(header::CONTENT_LENGTH) {
242 let content_len = h_val
243 .to_str()?
244 .parse::<u64>()?;
245
246 if content_len > abort.max_size() as u64 {
247 return Err(Error::SizeLimit)
248 }
249 }
250 Ok(Loop::Break(resp))
251 }
252 })
253 })
254 .then(|result| {
255 future::ok(sender.send(result).unwrap_or(()))
256 });
257 tokio::spawn(fut);
258 trace!(target: "fetch", "waiting for next request ...");
259 future::ok(())
260 });
261
262 tx_start.send(Ok(())).unwrap_or(());
263
264 debug!(target: "fetch", "processing requests ...");
265 if let Err(()) = runtime.block_on(future) {
266 error!(target: "fetch", "error while executing future")
267 }
268 debug!(target: "fetch", "fetch background thread finished")
269 })
270 }
271}
272
273impl Fetch for Client {
274 type Result = Box<dyn Future<Item=Response, Error=Error> + Send + 'static>;
275
276 fn fetch(&self, request: Request, abort: Abort) -> Self::Result {
277 debug!(target: "fetch", "fetching: {:?}", request.url());
278 if abort.is_aborted() {
279 return Box::new(future::err(Error::Aborted))
280 }
281 let (tx_res, rx_res) = oneshot::channel();
282 let maxdur = abort.max_duration();
283 let sender = self.runtime.clone();
284 let future = sender.send(Some((request, abort, tx_res)))
285 .map_err(|e| {
286 error!(target: "fetch", "failed to schedule request: {}", e);
287 Error::BackgroundThreadDead
288 })
289 .and_then(|_| rx_res.map_err(|oneshot::Canceled| Error::BackgroundThreadDead))
290 .and_then(future::result);
291
292 Box::new(future.timeout(maxdur)
293 .map_err(|err| {
294 if err.is_inner() {
295 Error::from(err.into_inner().unwrap())
296 } else {
297 Error::from(err)
298 }
299 })
300 )
301 }
302
303 fn get(&self, url: &str, abort: Abort) -> Self::Result {
305 let url: Url = match url.parse() {
306 Ok(u) => u,
307 Err(e) => return Box::new(future::err(e.into()))
308 };
309 self.fetch(Request::get(url), abort)
310 }
311
312 fn post(&self, url: &str, abort: Abort) -> Self::Result {
314 let url: Url = match url.parse() {
315 Ok(u) => u,
316 Err(e) => return Box::new(future::err(e.into()))
317 };
318 self.fetch(Request::post(url), abort)
319 }
320}
321
322fn redirect_location(u: Url, r: &Response) -> Option<(Url, bool)> {
324 let preserve_method = match r.status() {
325 StatusCode::TEMPORARY_REDIRECT | StatusCode::PERMANENT_REDIRECT => true,
326 _ => false,
327 };
328 match r.status() {
329 StatusCode::MOVED_PERMANENTLY
330 | StatusCode::PERMANENT_REDIRECT
331 | StatusCode::TEMPORARY_REDIRECT
332 | StatusCode::FOUND
333 | StatusCode::SEE_OTHER => {
334 r.headers.get(header::LOCATION).and_then(|loc| {
335 loc.to_str().ok().and_then(|loc_s| {
336 u.join(loc_s).ok().map(|url| (url, preserve_method))
337 })
338 })
339 }
340 _ => None
341 }
342}
343
344#[derive(Debug, Clone)]
346pub struct Request {
347 url: Url,
348 method: Method,
349 headers: HeaderMap,
350 body: Bytes,
351}
352
353impl Request {
354 pub fn new(url: Url, method: Method) -> Request {
356 Request {
357 url, method,
358 headers: HeaderMap::new(),
359 body: Default::default(),
360 }
361 }
362
363 pub fn get(url: Url) -> Request {
365 Request::new(url, Method::GET)
366 }
367
368 pub fn post(url: Url) -> Request {
370 Request::new(url, Method::POST)
371 }
372
373 pub fn url(&self) -> &Url {
375 &self.url
376 }
377
378 pub fn headers(&self) -> &HeaderMap {
380 &self.headers
381 }
382
383 pub fn headers_mut(&mut self) -> &mut HeaderMap {
385 &mut self.headers
386 }
387
388 pub fn set_body<T: Into<Bytes>>(&mut self, body: T) {
390 self.body = body.into();
391 }
392
393 pub fn set_url(&mut self, url: Url) {
395 self.url = url;
396 }
397
398 pub fn with_header<K>(mut self, key: K, val: HeaderValue) -> Self
400 where K: IntoHeaderName,
401 {
402 self.headers_mut().append(key, val);
403 self
404 }
405
406 pub fn with_body<T: Into<Bytes>>(mut self, body: T) -> Self {
408 self.set_body(body);
409 self
410 }
411}
412
413impl From<Request> for hyper::Request<hyper::Body> {
414 fn from(req: Request) -> hyper::Request<hyper::Body> {
415 let uri: hyper::Uri = req.url.as_ref().parse().expect("Every valid URLis also a URI.");
416 hyper::Request::builder()
417 .method(req.method)
418 .uri(uri)
419 .header(header::USER_AGENT, HeaderValue::from_static("Tetsy Fetch Neo"))
420 .body(req.body.into())
421 .expect("Header, uri, method, and body are already valid and can not fail to parse; qed")
422 }
423}
424
425#[derive(Debug)]
427pub struct Response {
428 url: Url,
429 status: StatusCode,
430 headers: HeaderMap,
431 body: hyper::Body,
432 abort: Abort,
433 nread: usize,
434}
435
436impl Response {
437 pub fn new(u: Url, r: hyper::Response<hyper::Body>, a: Abort) -> Response {
439 Response {
440 url: u,
441 status: r.status(),
442 headers: r.headers().clone(),
443 body: r.into_body(),
444 abort: a,
445 nread: 0,
446 }
447 }
448
449 pub fn status(&self) -> StatusCode {
451 self.status
452 }
453
454 pub fn is_success(&self) -> bool {
456 self.status() == StatusCode::OK
457 }
458
459 pub fn is_not_found(&self) -> bool {
461 self.status() == StatusCode::NOT_FOUND
462 }
463
464 pub fn is_html(&self) -> bool {
466 self.headers.get(header::CONTENT_TYPE).and_then(|ct_val| {
467 ct_val.to_str().ok().map(|ct_str| {
468 ct_str.contains("text") && ct_str.contains("html")
469 })
470 }).unwrap_or(false)
471 }
472}
473
474impl Stream for Response {
475 type Item = hyper::Chunk;
476 type Error = Error;
477
478 fn poll(&mut self) -> futures::Poll<Option<Self::Item>, Self::Error> {
479 if self.abort.is_aborted() {
480 debug!(target: "fetch", "fetch of {} aborted", self.url);
481 return Err(Error::Aborted)
482 }
483 match try_ready!(self.body.poll()) {
484 None => Ok(Async::Ready(None)),
485 Some(c) => {
486 if self.nread + c.len() > self.abort.max_size() {
487 debug!(target: "fetch", "size limit {:?} for {} exceeded", self.abort.max_size(), self.url);
488 return Err(Error::SizeLimit)
489 }
490 self.nread += c.len();
491 Ok(Async::Ready(Some(c)))
492 }
493 }
494 }
495}
496
497pub struct BodyReader {
502 chunk: hyper::Chunk,
503 body: Option<hyper::Body>,
504 abort: Abort,
505 offset: usize,
506 count: usize,
507}
508
509impl BodyReader {
510 pub fn new(r: Response) -> BodyReader {
512 BodyReader {
513 body: Some(r.body),
514 chunk: Default::default(),
515 abort: r.abort,
516 offset: 0,
517 count: 0,
518 }
519 }
520}
521
522impl io::Read for BodyReader {
523 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
524 let mut n = 0;
525 while self.body.is_some() {
526 if self.offset < self.chunk.len() {
528 let k = min(self.chunk.len() - self.offset, buf.len() - n);
529 if self.count + k > self.abort.max_size() {
530 debug!(target: "fetch", "size limit {:?} exceeded", self.abort.max_size());
531 return Err(io::Error::new(io::ErrorKind::PermissionDenied, "size limit exceeded"))
532 }
533 let c = &self.chunk[self.offset .. self.offset + k];
534 (&mut buf[n .. n + k]).copy_from_slice(c);
535 self.offset += k;
536 self.count += k;
537 n += k;
538 if n == buf.len() {
539 break
540 }
541 } else {
542 let body = self.body.take().expect("loop condition ensures `self.body` is always defined; qed");
543 match body.into_future().wait() { Err((e, _)) => {
545 error!(target: "fetch", "failed to read chunk: {}", e);
546 return Err(io::Error::new(io::ErrorKind::Other, "failed to read body chunk"))
547 }
548 Ok((None, _)) => break, Ok((Some(c), b)) => {
550 self.body = Some(b);
551 self.chunk = c;
552 self.offset = 0
553 }
554 }
555 }
556 }
557 Ok(n)
558 }
559}
560
561#[derive(Debug)]
563pub enum Error {
564 Hyper(hyper::Error),
566 HyperHeaderToStrError(hyper::header::ToStrError),
568 ParseInt(std::num::ParseIntError),
570 Io(io::Error),
572 Url(url::ParseError),
574 Aborted,
576 TooManyRedirects,
578 TokioTimeoutInnerVal(String),
580 TokioTimer(Option<tokio::timer::Error>),
582 Timeout,
584 SizeLimit,
586 BackgroundThreadDead,
588}
589
590impl fmt::Display for Error {
591 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
592 match *self {
593 Error::Aborted => write!(fmt, "The request has been aborted."),
594 Error::Hyper(ref e) => write!(fmt, "{}", e),
595 Error::HyperHeaderToStrError(ref e) => write!(fmt, "{}", e),
596 Error::ParseInt(ref e) => write!(fmt, "{}", e),
597 Error::Url(ref e) => write!(fmt, "{}", e),
598 Error::Io(ref e) => write!(fmt, "{}", e),
599 Error::BackgroundThreadDead => write!(fmt, "background thread gond"),
600 Error::TooManyRedirects => write!(fmt, "too many redirects"),
601 Error::TokioTimeoutInnerVal(ref s) => write!(fmt, "tokio timer inner value error: {:?}", s),
602 Error::TokioTimer(ref e) => write!(fmt, "tokio timer error: {:?}", e),
603 Error::Timeout => write!(fmt, "request timed out"),
604 Error::SizeLimit => write!(fmt, "size limit reached"),
605 }
606 }
607}
608
609impl ::std::error::Error for Error {
610 fn description(&self) -> &str { "Fetch client error" }
611 fn cause(&self) -> Option<&dyn std::error::Error> { None }
612}
613
614impl From<hyper::Error> for Error {
615 fn from(e: hyper::Error) -> Self {
616 Error::Hyper(e)
617 }
618}
619
620impl From<hyper::header::ToStrError> for Error {
621 fn from(e: hyper::header::ToStrError) -> Self {
622 Error::HyperHeaderToStrError(e)
623 }
624}
625
626impl From<std::num::ParseIntError> for Error {
627 fn from(e: std::num::ParseIntError) -> Self {
628 Error::ParseInt(e)
629 }
630}
631
632impl From<io::Error> for Error {
633 fn from(e: io::Error) -> Self {
634 Error::Io(e)
635 }
636}
637
638impl From<url::ParseError> for Error {
639 fn from(e: url::ParseError) -> Self {
640 Error::Url(e)
641 }
642}
643
644impl<T: std::fmt::Debug> From<tokio::timer::timeout::Error<T>> for Error {
645 fn from(e: tokio::timer::timeout::Error<T>) -> Self {
646 if e.is_inner() {
647 Error::TokioTimeoutInnerVal(format!("{:?}", e.into_inner().unwrap()))
648 } else if e.is_elapsed() {
649 Error::Timeout
650 } else {
651 Error::TokioTimer(e.into_timer())
652 }
653 }
654}
655
656impl From<tokio::timer::Error> for Error {
657 fn from(e: tokio::timer::Error) -> Self {
658 Error::TokioTimer(Some(e))
659 }
660}
661
662#[cfg(test)]
663mod test {
664 use super::*;
665 use futures::future;
666 use futures::sync::oneshot;
667 use hyper::{
668 StatusCode,
669 service::Service,
670 };
671 use tokio::timer::Delay;
672 use tokio::runtime::current_thread::Runtime;
673 use std::io::Read;
674 use std::net::SocketAddr;
675
676 const ADDRESS: &str = "127.0.0.1:0";
677
678 #[test]
679 fn it_should_fetch() {
680 let server = TestServer::run();
681 let client = Client::new(4).unwrap();
682 let mut runtime = Runtime::new().unwrap();
683
684 let future = client.get(&format!("http://{}?123", server.addr()), Abort::default())
685 .map(|resp| {
686 assert!(resp.is_success());
687 resp
688 })
689 .map(|resp| resp.concat2())
690 .flatten()
691 .map(|body| assert_eq!(&body[..], b"123"))
692 .map_err(|err| panic!(err));
693
694 runtime.block_on(future).unwrap();
695 }
696
697 #[test]
698 fn it_should_fetch_in_light_mode() {
699 let server = TestServer::run();
700 let client = Client::new(1).unwrap();
701 let mut runtime = Runtime::new().unwrap();
702
703 let future = client.get(&format!("http://{}?123", server.addr()), Abort::default())
704 .map(|resp| {
705 assert!(resp.is_success());
706 resp
707 })
708 .map(|resp| resp.concat2())
709 .flatten()
710 .map(|body| assert_eq!(&body[..], b"123"))
711 .map_err(|err| panic!(err));
712
713 runtime.block_on(future).unwrap();
714 }
715
716 #[test]
717 fn it_should_timeout() {
718 let server = TestServer::run();
719 let client = Client::new(4).unwrap();
720 let mut runtime = Runtime::new().unwrap();
721
722 let abort = Abort::default().with_max_duration(Duration::from_secs(1));
723
724 let future = client.get(&format!("http://{}/delay?3", server.addr()), abort)
725 .then(|res| {
726 match res {
727 Err(Error::Timeout) => Ok::<_, ()>(()),
728 other => panic!("expected timeout, got {:?}", other),
729 }
730 });
731
732 runtime.block_on(future).unwrap();
733 }
734
735 #[test]
736 fn it_should_follow_redirects() {
737 let server = TestServer::run();
738 let client = Client::new(4).unwrap();
739 let mut runtime = Runtime::new().unwrap();
740
741 let abort = Abort::default();
742
743 let future = client.get(&format!("http://{}/redirect?http://{}/", server.addr(), server.addr()), abort)
744 .and_then(|resp| {
745 if resp.is_success() { Ok(()) } else { panic!("Response unsuccessful") }
746 });
747
748 runtime.block_on(future).unwrap();
749 }
750
751 #[test]
752 fn it_should_follow_relative_redirects() {
753 let server = TestServer::run();
754 let client = Client::new(4).unwrap();
755 let mut runtime = Runtime::new().unwrap();
756
757 let abort = Abort::default().with_max_redirects(4);
758 let future = client.get(&format!("http://{}/redirect?/", server.addr()), abort)
759 .and_then(|resp| {
760 if resp.is_success() { Ok(()) } else { panic!("Response unsuccessful") }
761 });
762
763 runtime.block_on(future).unwrap();
764 }
765
766 #[test]
767 fn it_should_not_follow_too_many_redirects() {
768 let server = TestServer::run();
769 let client = Client::new(4).unwrap();
770 let mut runtime = Runtime::new().unwrap();
771
772 let abort = Abort::default().with_max_redirects(3);
773 let future = client.get(&format!("http://{}/loop", server.addr()), abort)
774 .then(|res| {
775 match res {
776 Err(Error::TooManyRedirects) => Ok::<_, ()>(()),
777 other => panic!("expected too many redirects error, got {:?}", other)
778 }
779 });
780
781 runtime.block_on(future).unwrap();
782 }
783
784 #[test]
785 fn it_should_read_data() {
786 let server = TestServer::run();
787 let client = Client::new(4).unwrap();
788 let mut runtime = Runtime::new().unwrap();
789
790 let abort = Abort::default();
791 let future = client.get(&format!("http://{}?abcdefghijklmnopqrstuvwxyz", server.addr()), abort)
792 .and_then(|resp| {
793 if resp.is_success() { Ok(resp) } else { panic!("Response unsuccessful") }
794 })
795 .map(|resp| resp.concat2())
796 .flatten()
797 .map(|body| assert_eq!(&body[..], b"abcdefghijklmnopqrstuvwxyz"));
798
799 runtime.block_on(future).unwrap();
800 }
801
802 #[test]
803 fn it_should_not_read_too_much_data() {
804 let server = TestServer::run();
805 let client = Client::new(4).unwrap();
806 let mut runtime = Runtime::new().unwrap();
807
808 let abort = Abort::default().with_max_size(3);
809 let future = client.get(&format!("http://{}/?1234", server.addr()), abort)
810 .and_then(|resp| {
811 if resp.is_success() { Ok(resp) } else { panic!("Response unsuccessful") }
812 })
813 .map(|resp| resp.concat2())
814 .flatten()
815 .then(|body| {
816 match body {
817 Err(Error::SizeLimit) => Ok::<_, ()>(()),
818 other => panic!("expected size limit error, got {:?}", other),
819 }
820 });
821
822 runtime.block_on(future).unwrap();
823 }
824
825 #[test]
826 fn it_should_not_read_too_much_data_sync() {
827 let server = TestServer::run();
828 let client = Client::new(4).unwrap();
829 let mut runtime = Runtime::new().unwrap();
830
831 let abort = Abort::default().with_max_size(3);
849 let future = client.get(&format!("http://{}/?1234", server.addr()), abort)
850 .and_then(|resp| {
851 assert_eq!(true, false, "Unreachable. (see FIXME note)");
852 assert!(resp.is_success());
853 let mut buffer = Vec::new();
854 let mut reader = BodyReader::new(resp);
855 match reader.read_to_end(&mut buffer) {
856 Err(ref e) if e.kind() == io::ErrorKind::PermissionDenied => Ok(()),
857 other => panic!("expected size limit error, got {:?}", other)
858 }
859 });
860
861 match runtime.block_on(future) {
863 Err(Error::SizeLimit) => {},
864 other => panic!("Expected `Error::SizeLimit`, got: {:?}", other),
865 }
866 }
867
868 struct TestServer;
869
870 impl Service for TestServer {
871 type ReqBody = hyper::Body;
872 type ResBody = hyper::Body;
873 type Error = Error;
874 type Future = Box<dyn Future<Item=hyper::Response<Self::ResBody>, Error=Self::Error> + Send + 'static>;
875
876 fn call(&mut self, req: hyper::Request<hyper::Body>) -> Self::Future {
877 match req.uri().path() {
878 "/" => {
879 let body = req.uri().query().unwrap_or("").to_string();
880 let res = hyper::Response::new(body.into());
881 Box::new(future::ok(res))
882 }
883 "/redirect" => {
884 let loc = req.uri().query().unwrap_or("/").to_string();
885 let res = hyper::Response::builder()
886 .status(StatusCode::MOVED_PERMANENTLY)
887 .header(hyper::header::LOCATION, loc)
888 .body(hyper::Body::empty())
889 .expect("Unable to create response");
890 Box::new(future::ok(res))
891 }
892 "/loop" => {
893 let res = hyper::Response::builder()
894 .status(StatusCode::MOVED_PERMANENTLY)
895 .header(hyper::header::LOCATION, "/loop")
896 .body(hyper::Body::empty())
897 .expect("Unable to create response");
898 Box::new(future::ok(res))
899 }
900 "/delay" => {
901 let dur = Duration::from_secs(req.uri().query().unwrap_or("0").parse().unwrap());
902 let delayed_res = Delay::new(std::time::Instant::now() + dur)
903 .and_then(|_| Ok::<_, _>(hyper::Response::new(hyper::Body::empty())))
904 .from_err();
905 Box::new(delayed_res)
906 }
907 _ => {
908 let res = hyper::Response::builder()
909 .status(StatusCode::NOT_FOUND)
910 .body(hyper::Body::empty())
911 .expect("Unable to create response");
912 Box::new(future::ok(res))
913 }
914 }
915 }
916 }
917
918 impl TestServer {
919 fn run() -> Handle {
920 let (tx_start, rx_start) = std::sync::mpsc::sync_channel(1);
921 let (tx_end, rx_end) = oneshot::channel();
922 let rx_end_fut = rx_end.map(|_| ()).map_err(|_| ());
923 thread::spawn(move || {
924 let addr = ADDRESS.parse().unwrap();
925
926 let server = hyper::server::Server::bind(&addr)
927 .serve(|| future::ok::<_, hyper::Error>(TestServer));
928
929 tx_start.send(server.local_addr()).unwrap_or(());
930
931 tokio::run(
932 server.with_graceful_shutdown(rx_end_fut)
933 .map_err(|e| panic!("server error: {}", e))
934 );
935 });
936
937 Handle(rx_start.recv().unwrap(), Some(tx_end))
938 }
939 }
940
941 struct Handle(SocketAddr, Option<oneshot::Sender<()>>);
942
943 impl Handle {
944 fn addr(&self) -> SocketAddr {
945 self.0
946 }
947 }
948
949 impl Drop for Handle {
950 fn drop(&mut self) {
951 self.1.take().unwrap().send(()).unwrap();
952 }
953 }
954}