libnetrunner/
crawler.rs

1use governor::clock::QuantaClock;
2use governor::state::keyed::DashMapStateStore;
3use governor::RateLimiter;
4use reqwest::{Client, StatusCode};
5use std::collections::hash_map::DefaultHasher;
6use std::hash::{Hash, Hasher};
7use std::path::PathBuf;
8use std::sync::Arc;
9use std::time::Duration;
10use thiserror::Error;
11use tokio_retry::strategy::ExponentialBackoff;
12use tokio_retry::RetryIf;
13
14use crate::archive::ArchiveRecord;
15use crate::cdx::create_archive_url;
16
17static APP_USER_AGENT: &str = concat!("netrunner", "/", env!("CARGO_PKG_VERSION"));
18const RETRY_DELAY_MS: u64 = 5000;
19
20pub type RateLimit = RateLimiter<String, DashMapStateStore<String>, QuantaClock>;
21
22#[derive(Error, Debug)]
23pub enum FetchError {
24    #[error("Unable to create ArchiveRecord")]
25    ArchiveError,
26    #[error("Too Many Requests")]
27    TooManyRequests,
28    #[error("HTTP status error: {0}")]
29    HttpError(reqwest::Error),
30    #[error("Request error: {0}")]
31    RequestError(reqwest::Error),
32}
33
34pub fn http_client() -> Client {
35    // Use a normal user-agent otherwise some sites won't let us crawl
36    reqwest::Client::builder()
37        .gzip(true)
38        .user_agent(APP_USER_AGENT)
39        .connect_timeout(Duration::from_secs(1))
40        .timeout(Duration::from_secs(10))
41        .build()
42        .expect("Unable to create HTTP client")
43}
44
45/// Checks to see if we should retry a FetchError based on legitimate issues versus
46/// something like a 404 or 403 which would happen everytime.
47fn should_retry(e: &FetchError) -> bool {
48    match e {
49        FetchError::HttpError(err) => {
50            if let Some(status_code) = err.status() {
51                status_code.as_u16() != 403 && status_code != 404
52            } else {
53                true
54            }
55        }
56        _ => true,
57    }
58}
59
60/// Handles crawling a url.
61pub async fn handle_crawl(
62    client: &Client,
63    tmp_storage: Option<PathBuf>,
64    lim: Arc<RateLimit>,
65    url: &url::Url,
66) -> anyhow::Result<ArchiveRecord, FetchError> {
67    // URL to Wayback Machine
68    let ia_url = create_archive_url(url.as_ref());
69
70    let domain = url.domain().expect("No domain in URL");
71
72    let retry_strat = ExponentialBackoff::from_millis(100)
73        .max_delay(Duration::from_secs(5))
74        .take(3);
75
76    // Retry if we run into 429 / timeout errors
77    let web_archive = RetryIf::spawn(
78        retry_strat.clone(),
79        || async {
80            log::info!("trying to fetch from IA");
81            // Wait for when we can crawl this based on the domain
82            lim.until_key_ready(&domain.to_string()).await;
83            fetch_page(client, &ia_url, Some(url.to_string()), tmp_storage.clone()).await
84        },
85        should_retry,
86    )
87    .await;
88
89    // If we fail trying to get the page from the web archive, hit the
90    // site directly.
91    if web_archive.is_err() {
92        let retry_strat = ExponentialBackoff::from_millis(100)
93            .max_delay(Duration::from_secs(5))
94            .take(3);
95
96        RetryIf::spawn(
97            retry_strat,
98            || async {
99                log::info!("trying to fetch from origin");
100                // Wait for when we can crawl this based on the domain
101                lim.until_key_ready(&domain.to_string()).await;
102                fetch_page(client, url.as_ref(), None, tmp_storage.clone()).await
103            },
104            should_retry,
105        )
106        .await
107    } else {
108        Ok(web_archive.unwrap())
109    }
110}
111
112async fn fetch_page(
113    client: &Client,
114    url: &str,
115    url_override: Option<String>,
116    page_store: Option<PathBuf>,
117) -> anyhow::Result<ArchiveRecord, FetchError> {
118    // Wait for when we can crawl this based on the domain
119    match client.get(url).send().await {
120        Ok(resp) => {
121            if resp.status() == StatusCode::TOO_MANY_REQUESTS {
122                let retry_after_ms: u64 =
123                    resp.headers()
124                        .get("Retry-After")
125                        .map_or(RETRY_DELAY_MS, |header| {
126                            if let Ok(header) = header.to_str() {
127                                log::warn!("found Retry-After: {}", header);
128                                header.parse::<u64>().unwrap_or(RETRY_DELAY_MS)
129                            } else {
130                                RETRY_DELAY_MS
131                            }
132                        });
133
134                log::info!("429 received... retrying after {}ms", retry_after_ms);
135                tokio::time::sleep(tokio::time::Duration::from_millis(retry_after_ms)).await;
136
137                Err(FetchError::TooManyRequests)
138            } else if let Err(err) = resp.error_for_status_ref() {
139                log::warn!("Unable to fetch [{:?}] {} - {}", err.status(), url, err);
140                Err(FetchError::HttpError(err))
141            } else {
142                match ArchiveRecord::from_response(resp, url_override).await {
143                    Ok(record) => {
144                        if let Some(page_store) = page_store {
145                            if let Ok(serialized) = ron::to_string(&record) {
146                                let mut hasher = DefaultHasher::new();
147                                record.url.hash(&mut hasher);
148                                let id = hasher.finish().to_string();
149                                let file = page_store.join(id);
150                                let _ = std::fs::write(file.clone(), serialized);
151                                log::debug!("cached <{}> -> <{}>", record.url, file.display());
152                            }
153                        }
154                        Ok(record)
155                    }
156                    Err(err) => {
157                        log::error!("Unable to create ArchiveRecord: {err}");
158                        Err(FetchError::ArchiveError)
159                    }
160                }
161            }
162        }
163        Err(err) => {
164            log::warn!("Unable to fetch [{:?}] {} - {}", err.status(), url, err);
165            Err(FetchError::RequestError(err))
166        }
167    }
168}
169
170#[cfg(test)]
171mod test {
172    use super::{handle_crawl, http_client};
173    use governor::{Quota, RateLimiter};
174    use nonzero_ext::nonzero;
175    use std::io;
176    use std::sync::Arc;
177    use tracing_log::LogTracer;
178    use tracing_subscriber::EnvFilter;
179    use tracing_subscriber::{fmt, prelude::__tracing_subscriber_SubscriberExt};
180    use url::Url;
181
182    #[tokio::test]
183    #[ignore = "live http request"]
184    async fn test_handle_404() {
185        // Setup some nice console logging
186        let subscriber = tracing_subscriber::registry()
187            .with(
188                EnvFilter::from_default_env()
189                    .add_directive("libnetrunner=DEBUG".parse().expect("Invalid log filter")),
190            )
191            .with(fmt::Layer::new().with_writer(io::stdout));
192        tracing::subscriber::set_global_default(subscriber)
193            .expect("Unable to set a global subscriber");
194        LogTracer::init().expect("Unable to create logger");
195
196        let client = http_client();
197        let quota = Quota::per_second(nonzero!(2u32));
198        let lim = Arc::new(RateLimiter::<String, _, _>::keyed(quota));
199
200        // Known to 404 in the web archive, but actually exists.
201        let url = Url::parse(
202            "https://developers.home-assistant.io/blog/2020/05/08/logos-custom-integrations",
203        )
204        .expect("Invalid URL");
205
206        assert!(handle_crawl(&client, None, lim.clone(), &url).await.is_ok());
207    }
208}