wayback_rs/
downloader.rs

1use super::{
2    item::UrlInfo,
3    util::{retry_future, Retryable},
4    Item,
5};
6use bytes::{Buf, Bytes};
7use reqwest::{header::LOCATION, redirect, Client, StatusCode};
8use std::time::Duration;
9use thiserror::Error;
10use tryhard::RetryPolicy;
11
12const MAX_RETRIES: u32 = 7;
13const RETRY_INITIAL_DELAY_DURATION: Duration = Duration::from_millis(250);
14const BAD_GATEWAY_DELAY_DURATION: Duration = Duration::from_secs(30);
15const TCP_KEEPALIVE_DURATION: Duration = Duration::from_secs(20);
16const DEFAULT_REQUEST_TIMEOUT_DURATION: Duration = Duration::from_secs(10);
17
18#[derive(Error, Debug)]
19pub enum Error {
20    #[error("I/O error")]
21    Io(#[from] std::io::Error),
22    #[error("HTTP client error: {0:?}")]
23    Client(#[from] reqwest::Error),
24    #[error("Unexpected redirect: {0:?}")]
25    UnexpectedRedirect(Option<String>),
26    #[error("Unexpected redirect URL: {0:?}")]
27    UnexpectedRedirectUrl(String),
28    #[error("Unexpected status code: {0:?}")]
29    UnexpectedStatus(StatusCode),
30    #[error("Invalid UTF-8: {0:?}")]
31    InvalidUtf8(#[from] std::str::Utf8Error),
32}
33
34impl Retryable for Error {
35    fn max_retries() -> u32 {
36        MAX_RETRIES
37    }
38
39    fn log_level() -> Option<log::Level> {
40        Some(log::Level::Warn)
41    }
42
43    fn default_initial_delay() -> Duration {
44        RETRY_INITIAL_DELAY_DURATION
45    }
46
47    fn custom_retry_policy(&self) -> Option<RetryPolicy> {
48        match self {
49            Error::Io(_) => None,
50            Error::Client(_) => None,
51            // 502 (often Too Many Requests)
52            Error::UnexpectedStatus(StatusCode::BAD_GATEWAY) => {
53                Some(RetryPolicy::Delay(BAD_GATEWAY_DELAY_DURATION))
54            }
55            _ => Some(RetryPolicy::Break),
56        }
57    }
58}
59
60#[derive(Debug, Eq, PartialEq)]
61pub struct RedirectResolution {
62    pub url: String,
63    pub timestamp: String,
64    pub content: Bytes,
65    pub valid_initial_content: bool,
66    pub valid_digest: bool,
67}
68
69#[derive(Clone)]
70pub struct Downloader {
71    client: Client,
72}
73
74impl Downloader {
75    pub fn new(request_timeout: Duration) -> reqwest::Result<Self> {
76        let tcp_keepalive = Some(TCP_KEEPALIVE_DURATION);
77
78        Ok(Self {
79            client: Client::builder()
80                .timeout(request_timeout)
81                .tcp_keepalive(tcp_keepalive)
82                .redirect(redirect::Policy::none())
83                .build()?,
84        })
85    }
86
87    fn wayback_url(url: &str, timestamp: &str, original: bool) -> String {
88        format!(
89            "https://web.archive.org/web/{}{}/{}",
90            timestamp,
91            if original { "id_" } else { "if_" },
92            url
93        )
94    }
95
96    pub async fn resolve_redirect(
97        &self,
98        url: &str,
99        timestamp: &str,
100        expected_digest: &str,
101    ) -> Result<RedirectResolution, Error> {
102        let initial_url = Self::wayback_url(url, timestamp, true);
103        let initial_response = self.client.head(&initial_url).send().await?;
104
105        match initial_response.status() {
106            StatusCode::FOUND => {
107                match initial_response
108                    .headers()
109                    .get(LOCATION)
110                    .and_then(|value| value.to_str().ok())
111                    .map(str::to_string)
112                {
113                    Some(location) => {
114                        let info = location
115                            .parse::<UrlInfo>()
116                            .map_err(|_| Error::UnexpectedRedirectUrl(location))?;
117
118                        let guess = super::util::redirect::guess_redirect_content(&info.url);
119                        let mut guess_bytes = guess.as_bytes();
120                        let guess_digest = super::digest::compute_digest(&mut guess_bytes)?;
121
122                        let mut valid_initial_content = true;
123                        let mut valid_digest = true;
124
125                        let content = if guess_digest == expected_digest {
126                            Bytes::from(guess)
127                        } else {
128                            log::warn!("Invalid guess, re-requesting");
129                            let direct_bytes =
130                                self.client.get(&initial_url).send().await?.bytes().await?;
131                            let direct_digest =
132                                super::digest::compute_digest(&mut direct_bytes.clone().reader())?;
133                            valid_initial_content = false;
134                            valid_digest = direct_digest == expected_digest;
135
136                            direct_bytes
137                        };
138
139                        let actual_url = self
140                            .direct_resolve_redirect(&info.url, &info.timestamp)
141                            .await?;
142
143                        let actual_info = actual_url
144                            .parse::<UrlInfo>()
145                            .map_err(|_| Error::UnexpectedRedirectUrl(actual_url))?;
146
147                        Ok(RedirectResolution {
148                            url: actual_info.url,
149                            timestamp: actual_info.timestamp,
150                            content,
151                            valid_initial_content,
152                            valid_digest,
153                        })
154                    }
155                    None => Err(Error::UnexpectedRedirect(None)),
156                }
157            }
158            other => Err(Error::UnexpectedStatus(other)),
159        }
160    }
161
162    async fn direct_resolve_redirect(&self, url: &str, timestamp: &str) -> Result<String, Error> {
163        let response = self
164            .client
165            .head(Self::wayback_url(url, timestamp, true))
166            .send()
167            .await?;
168
169        match response.status() {
170            StatusCode::FOUND => {
171                match response
172                    .headers()
173                    .get(LOCATION)
174                    .and_then(|value| value.to_str().ok())
175                    .map(str::to_string)
176                {
177                    Some(location) => Ok(location),
178                    None => Err(Error::UnexpectedRedirect(None)),
179                }
180            }
181            other => Err(Error::UnexpectedStatus(other)),
182        }
183    }
184
185    pub async fn resolve_redirect_shallow(
186        &self,
187        url: &str,
188        timestamp: &str,
189        expected_digest: &str,
190    ) -> Result<(UrlInfo, String, bool), Error> {
191        let initial_url = Self::wayback_url(url, timestamp, true);
192        let initial_response = self.client.head(&initial_url).send().await?;
193
194        match initial_response.status() {
195            StatusCode::FOUND => {
196                match initial_response
197                    .headers()
198                    .get(LOCATION)
199                    .and_then(|value| value.to_str().ok())
200                    .map(str::to_string)
201                {
202                    Some(location) => {
203                        let info = location
204                            .parse::<UrlInfo>()
205                            .map_err(|_| Error::UnexpectedRedirectUrl(location))?;
206
207                        let guess = super::util::redirect::guess_redirect_content(&info.url);
208                        let mut guess_bytes = guess.as_bytes();
209                        let guess_digest = super::digest::compute_digest(&mut guess_bytes)?;
210
211                        let (content, valid_digest) = if guess_digest == expected_digest {
212                            (guess, true)
213                        } else {
214                            log::warn!("Invalid guess, re-requesting");
215                            let direct_bytes =
216                                self.client.get(&initial_url).send().await?.bytes().await?;
217                            let direct_digest =
218                                super::digest::compute_digest(&mut direct_bytes.clone().reader())?;
219                            (
220                                std::str::from_utf8(&direct_bytes)?.to_string(),
221                                direct_digest == expected_digest,
222                            )
223                        };
224
225                        Ok((info, content, valid_digest))
226                    }
227                    None => Err(Error::UnexpectedRedirect(None)),
228                }
229            }
230            other => Err(Error::UnexpectedStatus(other)),
231        }
232    }
233
234    async fn download(&self, url: &str, timestamp: &str, original: bool) -> Result<Bytes, Error> {
235        retry_future(|| self.download_once(url, timestamp, original)).await
236    }
237
238    async fn download_once(
239        &self,
240        url: &str,
241        timestamp: &str,
242        original: bool,
243    ) -> Result<Bytes, Error> {
244        let response = self
245            .client
246            .get(Self::wayback_url(url, timestamp, original))
247            .send()
248            .await?;
249
250        match response.status() {
251            StatusCode::OK => Ok(response.bytes().await?),
252            other => Err(Error::UnexpectedStatus(other)),
253        }
254    }
255
256    pub async fn download_item(&self, item: &Item) -> Result<Bytes, Error> {
257        self.download(&item.url, &item.timestamp(), true).await
258    }
259}
260
261impl Default for Downloader {
262    fn default() -> Self {
263        Self::new(DEFAULT_REQUEST_TIMEOUT_DURATION).unwrap()
264    }
265}