1use governor::clock::QuantaClock;
2use governor::state::keyed::DashMapStateStore;
3use governor::RateLimiter;
4use reqwest::{Client, StatusCode};
5use std::collections::hash_map::DefaultHasher;
6use std::hash::{Hash, Hasher};
7use std::path::PathBuf;
8use std::sync::Arc;
9use std::time::Duration;
10use thiserror::Error;
11use tokio_retry::strategy::ExponentialBackoff;
12use tokio_retry::RetryIf;
13
14use crate::archive::ArchiveRecord;
15use crate::cdx::create_archive_url;
16
17static APP_USER_AGENT: &str = concat!("netrunner", "/", env!("CARGO_PKG_VERSION"));
18const RETRY_DELAY_MS: u64 = 5000;
19
20pub type RateLimit = RateLimiter<String, DashMapStateStore<String>, QuantaClock>;
21
22#[derive(Error, Debug)]
23pub enum FetchError {
24 #[error("Unable to create ArchiveRecord")]
25 ArchiveError,
26 #[error("Too Many Requests")]
27 TooManyRequests,
28 #[error("HTTP status error: {0}")]
29 HttpError(reqwest::Error),
30 #[error("Request error: {0}")]
31 RequestError(reqwest::Error),
32}
33
34pub fn http_client() -> Client {
35 reqwest::Client::builder()
37 .gzip(true)
38 .user_agent(APP_USER_AGENT)
39 .connect_timeout(Duration::from_secs(1))
40 .timeout(Duration::from_secs(10))
41 .build()
42 .expect("Unable to create HTTP client")
43}
44
45fn should_retry(e: &FetchError) -> bool {
48 match e {
49 FetchError::HttpError(err) => {
50 if let Some(status_code) = err.status() {
51 status_code.as_u16() != 403 && status_code != 404
52 } else {
53 true
54 }
55 }
56 _ => true,
57 }
58}
59
60pub async fn handle_crawl(
62 client: &Client,
63 tmp_storage: Option<PathBuf>,
64 lim: Arc<RateLimit>,
65 url: &url::Url,
66) -> anyhow::Result<ArchiveRecord, FetchError> {
67 let ia_url = create_archive_url(url.as_ref());
69
70 let domain = url.domain().expect("No domain in URL");
71
72 let retry_strat = ExponentialBackoff::from_millis(100)
73 .max_delay(Duration::from_secs(5))
74 .take(3);
75
76 let web_archive = RetryIf::spawn(
78 retry_strat.clone(),
79 || async {
80 log::info!("trying to fetch from IA");
81 lim.until_key_ready(&domain.to_string()).await;
83 fetch_page(client, &ia_url, Some(url.to_string()), tmp_storage.clone()).await
84 },
85 should_retry,
86 )
87 .await;
88
89 if web_archive.is_err() {
92 let retry_strat = ExponentialBackoff::from_millis(100)
93 .max_delay(Duration::from_secs(5))
94 .take(3);
95
96 RetryIf::spawn(
97 retry_strat,
98 || async {
99 log::info!("trying to fetch from origin");
100 lim.until_key_ready(&domain.to_string()).await;
102 fetch_page(client, url.as_ref(), None, tmp_storage.clone()).await
103 },
104 should_retry,
105 )
106 .await
107 } else {
108 Ok(web_archive.unwrap())
109 }
110}
111
112async fn fetch_page(
113 client: &Client,
114 url: &str,
115 url_override: Option<String>,
116 page_store: Option<PathBuf>,
117) -> anyhow::Result<ArchiveRecord, FetchError> {
118 match client.get(url).send().await {
120 Ok(resp) => {
121 if resp.status() == StatusCode::TOO_MANY_REQUESTS {
122 let retry_after_ms: u64 =
123 resp.headers()
124 .get("Retry-After")
125 .map_or(RETRY_DELAY_MS, |header| {
126 if let Ok(header) = header.to_str() {
127 log::warn!("found Retry-After: {}", header);
128 header.parse::<u64>().unwrap_or(RETRY_DELAY_MS)
129 } else {
130 RETRY_DELAY_MS
131 }
132 });
133
134 log::info!("429 received... retrying after {}ms", retry_after_ms);
135 tokio::time::sleep(tokio::time::Duration::from_millis(retry_after_ms)).await;
136
137 Err(FetchError::TooManyRequests)
138 } else if let Err(err) = resp.error_for_status_ref() {
139 log::warn!("Unable to fetch [{:?}] {} - {}", err.status(), url, err);
140 Err(FetchError::HttpError(err))
141 } else {
142 match ArchiveRecord::from_response(resp, url_override).await {
143 Ok(record) => {
144 if let Some(page_store) = page_store {
145 if let Ok(serialized) = ron::to_string(&record) {
146 let mut hasher = DefaultHasher::new();
147 record.url.hash(&mut hasher);
148 let id = hasher.finish().to_string();
149 let file = page_store.join(id);
150 let _ = std::fs::write(file.clone(), serialized);
151 log::debug!("cached <{}> -> <{}>", record.url, file.display());
152 }
153 }
154 Ok(record)
155 }
156 Err(err) => {
157 log::error!("Unable to create ArchiveRecord: {err}");
158 Err(FetchError::ArchiveError)
159 }
160 }
161 }
162 }
163 Err(err) => {
164 log::warn!("Unable to fetch [{:?}] {} - {}", err.status(), url, err);
165 Err(FetchError::RequestError(err))
166 }
167 }
168}
169
170#[cfg(test)]
171mod test {
172 use super::{handle_crawl, http_client};
173 use governor::{Quota, RateLimiter};
174 use nonzero_ext::nonzero;
175 use std::io;
176 use std::sync::Arc;
177 use tracing_log::LogTracer;
178 use tracing_subscriber::EnvFilter;
179 use tracing_subscriber::{fmt, prelude::__tracing_subscriber_SubscriberExt};
180 use url::Url;
181
182 #[tokio::test]
183 #[ignore = "live http request"]
184 async fn test_handle_404() {
185 let subscriber = tracing_subscriber::registry()
187 .with(
188 EnvFilter::from_default_env()
189 .add_directive("libnetrunner=DEBUG".parse().expect("Invalid log filter")),
190 )
191 .with(fmt::Layer::new().with_writer(io::stdout));
192 tracing::subscriber::set_global_default(subscriber)
193 .expect("Unable to set a global subscriber");
194 LogTracer::init().expect("Unable to create logger");
195
196 let client = http_client();
197 let quota = Quota::per_second(nonzero!(2u32));
198 let lim = Arc::new(RateLimiter::<String, _, _>::keyed(quota));
199
200 let url = Url::parse(
202 "https://developers.home-assistant.io/blog/2020/05/08/logos-custom-integrations",
203 )
204 .expect("Invalid URL");
205
206 assert!(handle_crawl(&client, None, lim.clone(), &url).await.is_ok());
207 }
208}