reqwest_resume/
lib.rs

1//! Wrapper that uses the `Range` HTTP header to resume get requests.
2//!
3//! <p style="font-family: 'Fira Sans',sans-serif;padding:0.3em 0"><strong>
4//! <a href="https://crates.io/crates/reqwest_resume">📦&nbsp;&nbsp;Crates.io</a>&nbsp;&nbsp;│&nbsp;&nbsp;<a href="https://github.com/alecmocatta/reqwest_resume">📑&nbsp;&nbsp;GitHub</a>&nbsp;&nbsp;│&nbsp;&nbsp;<a href="https://constellation.zulipchat.com/#narrow/stream/213236-subprojects">💬&nbsp;&nbsp;Chat</a>
5//! </strong></p>
6//!
7//! It's a thin wrapper around [`reqwest`](https://github.com/seanmonstar/reqwest). It's a work in progress – wrapping functionality is copied across on an as-needed basis. Feel free to open a PR/issue if you need something.
8//!
9//! # Example
10//!
11//! ```
12//! use async_compression::futures::bufread::GzipDecoder;
13//! use futures::{io::BufReader, AsyncBufReadExt, StreamExt, TryStreamExt};
14//! use std::io;
15//!
16//! # #[tokio::main]
17//! # async fn main() {
18//! let url = "http://commoncrawl.s3.amazonaws.com/crawl-data/CC-MAIN-2018-30/warc.paths.gz";
19//! let body = reqwest_resume::get(url.parse().unwrap()).await.unwrap();
20//! // Content-Encoding isn't set, so decode manually
21//! let body = body
22//!     .bytes_stream()
23//!     .map_err(|e| io::Error::new(io::ErrorKind::Other, e));
24//! let body = BufReader::new(body.into_async_read());
25//! let mut body = GzipDecoder::new(body); // Content-Encoding isn't set, so decode manually
26//! body.multiple_members(true);
27//!
28//! let mut lines = BufReader::new(body).lines();
29//! while let Some(line) = lines.next().await {
30//!     println!("{}", line.unwrap());
31//! }
32//! # }
33//! ```
34
35#![doc(html_root_url = "https://docs.rs/reqwest_resume/0.3.2")]
36#![warn(
37	missing_copy_implementations,
38	missing_debug_implementations,
39	missing_docs,
40	trivial_casts,
41	trivial_numeric_casts,
42	unused_import_braces,
43	unused_qualifications,
44	unused_results,
45	clippy::pedantic
46)] // from https://github.com/rust-unofficial/patterns/blob/master/anti_patterns/deny-warnings.md
47#![allow(
48	clippy::new_without_default,
49	clippy::must_use_candidate,
50	clippy::missing_errors_doc
51)]
52
53use bytes::Bytes;
54use futures::{ready, FutureExt, Stream, TryFutureExt};
55use std::{
56	future::Future, pin::Pin, task::{Context, Poll}, time::Duration
57};
58use tokio::time::delay_for as sleep;
59
60/// Extension to [`reqwest::Client`] that provides a method to convert it
61pub trait ClientExt {
62	/// Convert a [`reqwest::Client`] into a [`reqwest_resume::Client`](Client)
63	fn resumable(self) -> Client;
64}
65impl ClientExt for reqwest::Client {
66	fn resumable(self) -> Client {
67		Client(self)
68	}
69}
70
71/// A `Client` to make Requests with.
72///
73/// See [`reqwest::Client`].
74#[derive(Debug)]
75pub struct Client(reqwest::Client);
76impl Client {
77	/// Constructs a new `Client`.
78	///
79	/// See [`reqwest::Client::new()`].
80	pub fn new() -> Self {
81		Self(reqwest::Client::new())
82	}
83	/// Convenience method to make a `GET` request to a URL.
84	///
85	/// See [`reqwest::Client::get()`].
86	pub fn get(&self, url: reqwest::Url) -> RequestBuilder {
87		// <U: reqwest::IntoUrl>
88		RequestBuilder(self.0.clone(), reqwest::Method::GET, url)
89	}
90}
91
92/// A builder to construct the properties of a Request.
93///
94/// See [`reqwest::RequestBuilder`].
95#[derive(Debug)]
96pub struct RequestBuilder(reqwest::Client, reqwest::Method, reqwest::Url);
97impl RequestBuilder {
98	/// Constructs the Request and sends it the target URL, returning a Response.
99	///
100	/// See [`reqwest::RequestBuilder::send()`].
101	pub fn send(&mut self) -> impl Future<Output = reqwest::Result<Response>> + Send {
102		let (client, method, url) = (self.0.clone(), self.1.clone(), self.2.clone());
103		async move {
104			let response = loop {
105				let builder = client.request(method.clone(), url.clone());
106				match builder.send().await {
107					Err(err) if !err.is_builder() && !err.is_redirect() && !err.is_status() => {
108						sleep(Duration::from_secs(1)).await
109					}
110					x => break x?,
111				}
112			};
113			let headers = hyperx::Headers::from(response.headers());
114			let accept_byte_ranges =
115				if let Some(&hyperx::header::AcceptRanges(ref ranges)) = headers.get() {
116					ranges
117						.iter()
118						.any(|u| *u == hyperx::header::RangeUnit::Bytes)
119				} else {
120					false
121				};
122			Ok(Response {
123				client,
124				method,
125				url,
126				response,
127				accept_byte_ranges,
128				pos: 0,
129			})
130		}
131	}
132}
133
134/// A Response to a submitted Request.
135///
136/// See [`reqwest::Response`].
137#[derive(Debug)]
138pub struct Response {
139	client: reqwest::Client,
140	method: reqwest::Method,
141	url: reqwest::Url,
142	response: reqwest::Response,
143	accept_byte_ranges: bool,
144	pos: u64,
145}
146impl Response {
147	/// Convert the response into a `Stream` of `Bytes` from the body.
148	///
149	/// See [`reqwest::Response::bytes_stream()`].
150	pub fn bytes_stream(self) -> impl Stream<Item = reqwest::Result<Bytes>> + Send {
151		Decoder {
152			client: self.client,
153			method: self.method,
154			url: self.url,
155			decoder: Box::pin(self.response.bytes_stream()),
156			accept_byte_ranges: self.accept_byte_ranges,
157			pos: self.pos,
158		}
159	}
160}
161
162struct Decoder {
163	client: reqwest::Client,
164	method: reqwest::Method,
165	url: reqwest::Url,
166	decoder: Pin<Box<dyn Stream<Item = reqwest::Result<Bytes>> + Send + Unpin>>,
167	accept_byte_ranges: bool,
168	pos: u64,
169}
170impl Stream for Decoder {
171	type Item = reqwest::Result<Bytes>;
172
173	fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
174		loop {
175			match ready!(self.decoder.as_mut().poll_next(cx)) {
176				Some(Err(err)) => {
177					if !self.accept_byte_ranges {
178						// TODO: we could try, for those servers that don't output Accept-Ranges but work anyway
179						break Poll::Ready(Some(Err(err)));
180					}
181					let builder = self.client.request(self.method.clone(), self.url.clone());
182					let mut headers = hyperx::Headers::new();
183					headers.set(hyperx::header::Range::Bytes(vec![
184						hyperx::header::ByteRangeSpec::AllFrom(self.pos),
185					]));
186					let builder = builder.headers(headers.into());
187					// https://developer.mozilla.org/en-US/docs/Web/HTTP/Range_requests
188					// https://github.com/sdroege/gst-plugin-rs/blob/dcb36832329fde0113a41b80ebdb5efd28ead68d/gst-plugin-http/src/httpsrc.rs
189					self.decoder = Box::pin(
190						sleep(Duration::from_secs(1))
191							.then(|()| builder.send())
192							.map_ok(reqwest::Response::bytes_stream)
193							.try_flatten_stream(),
194					);
195				}
196				Some(Ok(n)) => {
197					self.pos += n.len() as u64;
198					break Poll::Ready(Some(Ok(n)));
199				}
200				None => break Poll::Ready(None),
201			}
202		}
203	}
204}
205
206/// Shortcut method to quickly make a GET request.
207///
208/// See [`reqwest::get`].
209pub fn get(url: reqwest::Url) -> impl Future<Output = reqwest::Result<Response>> + Send {
210	// <T: IntoUrl>
211	Client::new().get(url).send()
212}
213
214#[cfg(test)]
215mod test {
216	use async_compression::futures::bufread::GzipDecoder; // TODO: use stream or https://github.com/alexcrichton/flate2-rs/pull/214
217	use futures::{future::join_all, io::BufReader, AsyncBufReadExt, StreamExt, TryStreamExt};
218	use std::io;
219
220	#[tokio::test]
221	async fn dl_s3() {
222		// Requests to large files on S3 regularly time out or close when made from slower connections. This test is fairly meaningless from fast connections. TODO
223		let body = reqwest::get(
224			"http://commoncrawl.s3.amazonaws.com/crawl-data/CC-MAIN-2018-30/warc.paths.gz",
225		)
226		.await
227		.unwrap();
228		let body = body
229			.bytes_stream()
230			.map_err(|e| io::Error::new(io::ErrorKind::Other, e));
231		let body = BufReader::new(body.into_async_read());
232		let mut body = GzipDecoder::new(body); // Content-Encoding isn't set, so decode manually
233		body.multiple_members(true);
234		let handles = BufReader::new(body)
235			.lines()
236			.map(|url| format!("http://commoncrawl.s3.amazonaws.com/{}", url.unwrap()))
237			.take(1) // painful to do more on CI, but unlikely to see errors. TODO
238			.map(|url| {
239				tokio::spawn(async move {
240					println!("{}", url);
241					let body = super::get(url.parse().unwrap()).await.unwrap();
242					let body = body
243						.bytes_stream()
244						.map_err(|e| io::Error::new(io::ErrorKind::Other, e));
245					let body = BufReader::new(body.into_async_read());
246					let mut body = GzipDecoder::new(body); // Content-Encoding isn't set, so decode manually
247					body.multiple_members(true);
248					let n = futures::io::copy(&mut body, &mut futures::io::sink())
249						.await
250						.unwrap();
251					println!("{}", n);
252				})
253			})
254			.collect::<Vec<_>>()
255			.await;
256		join_all(handles)
257			.await
258			.into_iter()
259			.collect::<Result<(), _>>()
260			.unwrap();
261	}
262}