cortex_runtime/acquisition/
http_client.rs1use anyhow::Result;
7use std::time::Duration;
8
9#[derive(Debug, Clone)]
11pub struct HttpResponse {
12 pub url: String,
14 pub final_url: String,
16 pub status: u16,
18 pub headers: Vec<(String, String)>,
20 pub body: String,
22}
23
24#[derive(Debug, Clone)]
26pub struct HeadResponse {
27 pub url: String,
29 pub status: u16,
31 pub content_type: Option<String>,
33 pub content_language: Option<String>,
35 pub last_modified: Option<String>,
37 pub cache_control: Option<String>,
39}
40
41#[derive(Clone)]
43pub struct HttpClient {
44 client: reqwest::Client,
45 h1_client: reqwest::Client,
47}
48
49impl HttpClient {
50 pub fn new(timeout_ms: u64) -> Self {
52 let ua = "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) \
53 AppleWebKit/537.36 (KHTML, like Gecko) \
54 Chrome/145.0.0.0 Safari/537.36";
55
56 let client = reqwest::Client::builder()
57 .timeout(Duration::from_millis(timeout_ms))
58 .redirect(reqwest::redirect::Policy::limited(5))
59 .user_agent(ua)
60 .build()
61 .unwrap_or_default();
62
63 let h1_client = reqwest::Client::builder()
64 .timeout(Duration::from_millis(timeout_ms))
65 .redirect(reqwest::redirect::Policy::limited(5))
66 .user_agent(ua)
67 .http1_only()
68 .build()
69 .unwrap_or_default();
70
71 Self { client, h1_client }
72 }
73
74 pub async fn get(&self, url: &str, timeout_ms: u64) -> Result<HttpResponse> {
78 match self.get_inner(&self.client, url, timeout_ms).await {
79 Ok(resp) => Ok(resp),
80 Err(e) => {
81 let err_str = format!("{e}");
83 if err_str.contains("http2")
84 || err_str.contains("protocol")
85 || err_str.contains("connection closed")
86 {
87 self.get_inner(&self.h1_client, url, timeout_ms).await
88 } else {
89 Err(e)
90 }
91 }
92 }
93 }
94
95 async fn get_inner(
96 &self,
97 client: &reqwest::Client,
98 url: &str,
99 timeout_ms: u64,
100 ) -> Result<HttpResponse> {
101 let mut retries = 0u32;
102 let max_retries = 2;
103
104 loop {
105 let resp = client
106 .get(url)
107 .timeout(Duration::from_millis(timeout_ms))
108 .send()
109 .await;
110
111 match resp {
112 Ok(r) => {
113 let status = r.status().as_u16();
114 let final_url = r.url().to_string();
115
116 if status >= 500 && retries < max_retries {
118 retries += 1;
119 let delay = Duration::from_millis(500 * 2u64.pow(retries - 1));
120 tokio::time::sleep(delay).await;
121 continue;
122 }
123
124 if status == 429 && retries < max_retries {
126 retries += 1;
127 let retry_after = r
128 .headers()
129 .get("retry-after")
130 .and_then(|v| v.to_str().ok())
131 .and_then(|s| s.parse::<u64>().ok())
132 .unwrap_or(2);
133 let delay = Duration::from_secs(retry_after.min(10));
134 tokio::time::sleep(delay).await;
135 continue;
136 }
137
138 let headers: Vec<(String, String)> = r
139 .headers()
140 .iter()
141 .filter(|(k, _)| {
142 matches!(
143 k.as_str(),
144 "content-type"
145 | "content-language"
146 | "last-modified"
147 | "cache-control"
148 | "x-robots-tag"
149 )
150 })
151 .map(|(k, v)| (k.to_string(), v.to_str().unwrap_or("").to_string()))
152 .collect();
153
154 let body = r.text().await.unwrap_or_default();
155
156 return Ok(HttpResponse {
157 url: url.to_string(),
158 final_url,
159 status,
160 headers,
161 body,
162 });
163 }
164 Err(e) => {
165 if retries < max_retries {
166 retries += 1;
167 let delay = Duration::from_millis(500 * 2u64.pow(retries - 1));
168 tokio::time::sleep(delay).await;
169 continue;
170 }
171 return Err(e.into());
172 }
173 }
174 }
175 }
176
177 pub async fn post_form(
182 &self,
183 url: &str,
184 form_fields: &[(String, String)],
185 extra_headers: &[(String, String)],
186 timeout_ms: u64,
187 ) -> Result<HttpResponse> {
188 let mut builder = self
189 .client
190 .post(url)
191 .timeout(Duration::from_millis(timeout_ms));
192
193 for (name, value) in extra_headers {
194 builder = builder.header(name.as_str(), value.as_str());
195 }
196
197 builder = builder.form(form_fields);
198
199 let r = builder.send().await?;
200 let status = r.status().as_u16();
201 let final_url = r.url().to_string();
202
203 let headers: Vec<(String, String)> = r
204 .headers()
205 .iter()
206 .map(|(k, v)| (k.to_string(), v.to_str().unwrap_or("").to_string()))
207 .collect();
208
209 let body = r.text().await.unwrap_or_default();
210
211 Ok(HttpResponse {
212 url: url.to_string(),
213 final_url,
214 status,
215 headers,
216 body,
217 })
218 }
219
220 pub async fn get_many(
222 &self,
223 urls: &[String],
224 concurrency: usize,
225 timeout_ms: u64,
226 ) -> Vec<Result<HttpResponse>> {
227 use futures::stream::{self, StreamExt};
228
229 let results: Vec<Result<HttpResponse>> = stream::iter(urls.iter())
230 .map(|url| {
231 let client = self.clone();
232 let u = url.clone();
233 let t = timeout_ms;
234 async move { client.get(&u, t).await }
235 })
236 .buffer_unordered(concurrency)
237 .collect()
238 .await;
239
240 results
241 }
242
243 pub async fn head_many(
245 &self,
246 urls: &[String],
247 concurrency: usize,
248 ) -> Vec<Result<HeadResponse>> {
249 use futures::stream::{self, StreamExt};
250
251 let results: Vec<Result<HeadResponse>> = stream::iter(urls.iter())
252 .map(|url| {
253 let client = self.client.clone();
254 let u = url.clone();
255 async move {
256 let resp = client
257 .head(&u)
258 .timeout(Duration::from_secs(10))
259 .send()
260 .await?;
261
262 let status = resp.status().as_u16();
263 let content_type = resp
264 .headers()
265 .get("content-type")
266 .and_then(|v| v.to_str().ok())
267 .map(|s| s.to_string());
268 let content_language = resp
269 .headers()
270 .get("content-language")
271 .and_then(|v| v.to_str().ok())
272 .map(|s| s.to_string());
273 let last_modified = resp
274 .headers()
275 .get("last-modified")
276 .and_then(|v| v.to_str().ok())
277 .map(|s| s.to_string());
278 let cache_control = resp
279 .headers()
280 .get("cache-control")
281 .and_then(|v| v.to_str().ok())
282 .map(|s| s.to_string());
283
284 Ok(HeadResponse {
285 url: u,
286 status,
287 content_type,
288 content_language,
289 last_modified,
290 cache_control,
291 })
292 }
293 })
294 .buffer_unordered(concurrency)
295 .collect()
296 .await;
297
298 results
299 }
300}
301
302#[cfg(test)]
303mod tests {
304 use super::*;
305
306 #[test]
307 fn test_http_client_creation() {
308 let client = HttpClient::new(10000);
309 let _ = client;
311 }
312
313 #[test]
314 fn test_head_response_defaults() {
315 let resp = HeadResponse {
316 url: "https://example.com".to_string(),
317 status: 200,
318 content_type: Some("text/html".to_string()),
319 content_language: None,
320 last_modified: None,
321 cache_control: None,
322 };
323 assert_eq!(resp.status, 200);
324 }
325}