1use super::{
2 item::UrlInfo,
3 util::{retry_future, Retryable},
4 Item,
5};
6use bytes::{Buf, Bytes};
7use reqwest::{header::LOCATION, redirect, Client, StatusCode};
8use std::time::Duration;
9use thiserror::Error;
10use tryhard::RetryPolicy;
11
12const MAX_RETRIES: u32 = 7;
13const RETRY_INITIAL_DELAY_DURATION: Duration = Duration::from_millis(250);
14const BAD_GATEWAY_DELAY_DURATION: Duration = Duration::from_secs(30);
15const TCP_KEEPALIVE_DURATION: Duration = Duration::from_secs(20);
16const DEFAULT_REQUEST_TIMEOUT_DURATION: Duration = Duration::from_secs(10);
17
18#[derive(Error, Debug)]
19pub enum Error {
20 #[error("I/O error")]
21 Io(#[from] std::io::Error),
22 #[error("HTTP client error: {0:?}")]
23 Client(#[from] reqwest::Error),
24 #[error("Unexpected redirect: {0:?}")]
25 UnexpectedRedirect(Option<String>),
26 #[error("Unexpected redirect URL: {0:?}")]
27 UnexpectedRedirectUrl(String),
28 #[error("Unexpected status code: {0:?}")]
29 UnexpectedStatus(StatusCode),
30 #[error("Invalid UTF-8: {0:?}")]
31 InvalidUtf8(#[from] std::str::Utf8Error),
32}
33
34impl Retryable for Error {
35 fn max_retries() -> u32 {
36 MAX_RETRIES
37 }
38
39 fn log_level() -> Option<log::Level> {
40 Some(log::Level::Warn)
41 }
42
43 fn default_initial_delay() -> Duration {
44 RETRY_INITIAL_DELAY_DURATION
45 }
46
47 fn custom_retry_policy(&self) -> Option<RetryPolicy> {
48 match self {
49 Error::Io(_) => None,
50 Error::Client(_) => None,
51 Error::UnexpectedStatus(StatusCode::BAD_GATEWAY) => {
53 Some(RetryPolicy::Delay(BAD_GATEWAY_DELAY_DURATION))
54 }
55 _ => Some(RetryPolicy::Break),
56 }
57 }
58}
59
60#[derive(Debug, Eq, PartialEq)]
61pub struct RedirectResolution {
62 pub url: String,
63 pub timestamp: String,
64 pub content: Bytes,
65 pub valid_initial_content: bool,
66 pub valid_digest: bool,
67}
68
69#[derive(Clone)]
70pub struct Downloader {
71 client: Client,
72}
73
74impl Downloader {
75 pub fn new(request_timeout: Duration) -> reqwest::Result<Self> {
76 let tcp_keepalive = Some(TCP_KEEPALIVE_DURATION);
77
78 Ok(Self {
79 client: Client::builder()
80 .timeout(request_timeout)
81 .tcp_keepalive(tcp_keepalive)
82 .redirect(redirect::Policy::none())
83 .build()?,
84 })
85 }
86
87 fn wayback_url(url: &str, timestamp: &str, original: bool) -> String {
88 format!(
89 "https://web.archive.org/web/{}{}/{}",
90 timestamp,
91 if original { "id_" } else { "if_" },
92 url
93 )
94 }
95
96 pub async fn resolve_redirect(
97 &self,
98 url: &str,
99 timestamp: &str,
100 expected_digest: &str,
101 ) -> Result<RedirectResolution, Error> {
102 let initial_url = Self::wayback_url(url, timestamp, true);
103 let initial_response = self.client.head(&initial_url).send().await?;
104
105 match initial_response.status() {
106 StatusCode::FOUND => {
107 match initial_response
108 .headers()
109 .get(LOCATION)
110 .and_then(|value| value.to_str().ok())
111 .map(str::to_string)
112 {
113 Some(location) => {
114 let info = location
115 .parse::<UrlInfo>()
116 .map_err(|_| Error::UnexpectedRedirectUrl(location))?;
117
118 let guess = super::util::redirect::guess_redirect_content(&info.url);
119 let mut guess_bytes = guess.as_bytes();
120 let guess_digest = super::digest::compute_digest(&mut guess_bytes)?;
121
122 let mut valid_initial_content = true;
123 let mut valid_digest = true;
124
125 let content = if guess_digest == expected_digest {
126 Bytes::from(guess)
127 } else {
128 log::warn!("Invalid guess, re-requesting");
129 let direct_bytes =
130 self.client.get(&initial_url).send().await?.bytes().await?;
131 let direct_digest =
132 super::digest::compute_digest(&mut direct_bytes.clone().reader())?;
133 valid_initial_content = false;
134 valid_digest = direct_digest == expected_digest;
135
136 direct_bytes
137 };
138
139 let actual_url = self
140 .direct_resolve_redirect(&info.url, &info.timestamp)
141 .await?;
142
143 let actual_info = actual_url
144 .parse::<UrlInfo>()
145 .map_err(|_| Error::UnexpectedRedirectUrl(actual_url))?;
146
147 Ok(RedirectResolution {
148 url: actual_info.url,
149 timestamp: actual_info.timestamp,
150 content,
151 valid_initial_content,
152 valid_digest,
153 })
154 }
155 None => Err(Error::UnexpectedRedirect(None)),
156 }
157 }
158 other => Err(Error::UnexpectedStatus(other)),
159 }
160 }
161
162 async fn direct_resolve_redirect(&self, url: &str, timestamp: &str) -> Result<String, Error> {
163 let response = self
164 .client
165 .head(Self::wayback_url(url, timestamp, true))
166 .send()
167 .await?;
168
169 match response.status() {
170 StatusCode::FOUND => {
171 match response
172 .headers()
173 .get(LOCATION)
174 .and_then(|value| value.to_str().ok())
175 .map(str::to_string)
176 {
177 Some(location) => Ok(location),
178 None => Err(Error::UnexpectedRedirect(None)),
179 }
180 }
181 other => Err(Error::UnexpectedStatus(other)),
182 }
183 }
184
185 pub async fn resolve_redirect_shallow(
186 &self,
187 url: &str,
188 timestamp: &str,
189 expected_digest: &str,
190 ) -> Result<(UrlInfo, String, bool), Error> {
191 let initial_url = Self::wayback_url(url, timestamp, true);
192 let initial_response = self.client.head(&initial_url).send().await?;
193
194 match initial_response.status() {
195 StatusCode::FOUND => {
196 match initial_response
197 .headers()
198 .get(LOCATION)
199 .and_then(|value| value.to_str().ok())
200 .map(str::to_string)
201 {
202 Some(location) => {
203 let info = location
204 .parse::<UrlInfo>()
205 .map_err(|_| Error::UnexpectedRedirectUrl(location))?;
206
207 let guess = super::util::redirect::guess_redirect_content(&info.url);
208 let mut guess_bytes = guess.as_bytes();
209 let guess_digest = super::digest::compute_digest(&mut guess_bytes)?;
210
211 let (content, valid_digest) = if guess_digest == expected_digest {
212 (guess, true)
213 } else {
214 log::warn!("Invalid guess, re-requesting");
215 let direct_bytes =
216 self.client.get(&initial_url).send().await?.bytes().await?;
217 let direct_digest =
218 super::digest::compute_digest(&mut direct_bytes.clone().reader())?;
219 (
220 std::str::from_utf8(&direct_bytes)?.to_string(),
221 direct_digest == expected_digest,
222 )
223 };
224
225 Ok((info, content, valid_digest))
226 }
227 None => Err(Error::UnexpectedRedirect(None)),
228 }
229 }
230 other => Err(Error::UnexpectedStatus(other)),
231 }
232 }
233
234 async fn download(&self, url: &str, timestamp: &str, original: bool) -> Result<Bytes, Error> {
235 retry_future(|| self.download_once(url, timestamp, original)).await
236 }
237
238 async fn download_once(
239 &self,
240 url: &str,
241 timestamp: &str,
242 original: bool,
243 ) -> Result<Bytes, Error> {
244 let response = self
245 .client
246 .get(Self::wayback_url(url, timestamp, original))
247 .send()
248 .await?;
249
250 match response.status() {
251 StatusCode::OK => Ok(response.bytes().await?),
252 other => Err(Error::UnexpectedStatus(other)),
253 }
254 }
255
256 pub async fn download_item(&self, item: &Item) -> Result<Bytes, Error> {
257 self.download(&item.url, &item.timestamp(), true).await
258 }
259}
260
261impl Default for Downloader {
262 fn default() -> Self {
263 Self::new(DEFAULT_REQUEST_TIMEOUT_DURATION).unwrap()
264 }
265}