Skip to main content

cortex_runtime/acquisition/
http_client.rs

1//! Async HTTP client wrapping reqwest.
2//!
3//! Not a browser — just HTTP requests. Handles redirects, timeouts,
4//! retry on 5xx, and exponential backoff on 429.
5
6use anyhow::Result;
7use std::time::Duration;
8
9/// Response from an HTTP GET request.
10#[derive(Debug, Clone)]
11pub struct HttpResponse {
12    /// Original requested URL.
13    pub url: String,
14    /// Final URL after redirects.
15    pub final_url: String,
16    /// HTTP status code.
17    pub status: u16,
18    /// Response headers (selected subset).
19    pub headers: Vec<(String, String)>,
20    /// Response body as text.
21    pub body: String,
22}
23
24/// Response from an HTTP HEAD request.
25#[derive(Debug, Clone)]
26pub struct HeadResponse {
27    /// Requested URL.
28    pub url: String,
29    /// HTTP status code.
30    pub status: u16,
31    /// Content-Type header.
32    pub content_type: Option<String>,
33    /// Content-Language header.
34    pub content_language: Option<String>,
35    /// Last-Modified header.
36    pub last_modified: Option<String>,
37    /// Cache-Control header.
38    pub cache_control: Option<String>,
39}
40
41/// HTTP client for the acquisition engine.
42#[derive(Clone)]
43pub struct HttpClient {
44    client: reqwest::Client,
45    /// HTTP/1.1-only fallback client for sites that reject HTTP/2.
46    h1_client: reqwest::Client,
47}
48
49impl HttpClient {
50    /// Create a new HTTP client with standard Chrome user-agent.
51    pub fn new(timeout_ms: u64) -> Self {
52        let ua = "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) \
53                  AppleWebKit/537.36 (KHTML, like Gecko) \
54                  Chrome/145.0.0.0 Safari/537.36";
55
56        let client = reqwest::Client::builder()
57            .timeout(Duration::from_millis(timeout_ms))
58            .redirect(reqwest::redirect::Policy::limited(5))
59            .user_agent(ua)
60            .build()
61            .unwrap_or_default();
62
63        let h1_client = reqwest::Client::builder()
64            .timeout(Duration::from_millis(timeout_ms))
65            .redirect(reqwest::redirect::Policy::limited(5))
66            .user_agent(ua)
67            .http1_only()
68            .build()
69            .unwrap_or_default();
70
71        Self { client, h1_client }
72    }
73
74    /// Perform a single GET request with retry on 5xx and backoff on 429.
75    ///
76    /// Falls back to HTTP/1.1 on protocol errors (some CDNs reject HTTP/2).
77    pub async fn get(&self, url: &str, timeout_ms: u64) -> Result<HttpResponse> {
78        match self.get_inner(&self.client, url, timeout_ms).await {
79            Ok(resp) => Ok(resp),
80            Err(e) => {
81                // If the error looks like a protocol issue, retry with HTTP/1.1
82                let err_str = format!("{e}");
83                if err_str.contains("http2")
84                    || err_str.contains("protocol")
85                    || err_str.contains("connection closed")
86                {
87                    self.get_inner(&self.h1_client, url, timeout_ms).await
88                } else {
89                    Err(e)
90                }
91            }
92        }
93    }
94
95    async fn get_inner(
96        &self,
97        client: &reqwest::Client,
98        url: &str,
99        timeout_ms: u64,
100    ) -> Result<HttpResponse> {
101        let mut retries = 0u32;
102        let max_retries = 2;
103
104        loop {
105            let resp = client
106                .get(url)
107                .timeout(Duration::from_millis(timeout_ms))
108                .send()
109                .await;
110
111            match resp {
112                Ok(r) => {
113                    let status = r.status().as_u16();
114                    let final_url = r.url().to_string();
115
116                    // Retry on 5xx
117                    if status >= 500 && retries < max_retries {
118                        retries += 1;
119                        let delay = Duration::from_millis(500 * 2u64.pow(retries - 1));
120                        tokio::time::sleep(delay).await;
121                        continue;
122                    }
123
124                    // Backoff on 429
125                    if status == 429 && retries < max_retries {
126                        retries += 1;
127                        let retry_after = r
128                            .headers()
129                            .get("retry-after")
130                            .and_then(|v| v.to_str().ok())
131                            .and_then(|s| s.parse::<u64>().ok())
132                            .unwrap_or(2);
133                        let delay = Duration::from_secs(retry_after.min(10));
134                        tokio::time::sleep(delay).await;
135                        continue;
136                    }
137
138                    let headers: Vec<(String, String)> = r
139                        .headers()
140                        .iter()
141                        .filter(|(k, _)| {
142                            matches!(
143                                k.as_str(),
144                                "content-type"
145                                    | "content-language"
146                                    | "last-modified"
147                                    | "cache-control"
148                                    | "x-robots-tag"
149                            )
150                        })
151                        .map(|(k, v)| (k.to_string(), v.to_str().unwrap_or("").to_string()))
152                        .collect();
153
154                    let body = r.text().await.unwrap_or_default();
155
156                    return Ok(HttpResponse {
157                        url: url.to_string(),
158                        final_url,
159                        status,
160                        headers,
161                        body,
162                    });
163                }
164                Err(e) => {
165                    if retries < max_retries {
166                        retries += 1;
167                        let delay = Duration::from_millis(500 * 2u64.pow(retries - 1));
168                        tokio::time::sleep(delay).await;
169                        continue;
170                    }
171                    return Err(e.into());
172                }
173            }
174        }
175    }
176
177    /// POST form data (url-encoded) and return a response with all headers.
178    ///
179    /// Unlike `get()`, this captures *all* response headers (not a filtered
180    /// subset), because callers like the auth module need `set-cookie` headers.
181    pub async fn post_form(
182        &self,
183        url: &str,
184        form_fields: &[(String, String)],
185        extra_headers: &[(String, String)],
186        timeout_ms: u64,
187    ) -> Result<HttpResponse> {
188        let mut builder = self
189            .client
190            .post(url)
191            .timeout(Duration::from_millis(timeout_ms));
192
193        for (name, value) in extra_headers {
194            builder = builder.header(name.as_str(), value.as_str());
195        }
196
197        builder = builder.form(form_fields);
198
199        let r = builder.send().await?;
200        let status = r.status().as_u16();
201        let final_url = r.url().to_string();
202
203        let headers: Vec<(String, String)> = r
204            .headers()
205            .iter()
206            .map(|(k, v)| (k.to_string(), v.to_str().unwrap_or("").to_string()))
207            .collect();
208
209        let body = r.text().await.unwrap_or_default();
210
211        Ok(HttpResponse {
212            url: url.to_string(),
213            final_url,
214            status,
215            headers,
216            body,
217        })
218    }
219
220    /// Perform parallel GET requests with bounded concurrency.
221    pub async fn get_many(
222        &self,
223        urls: &[String],
224        concurrency: usize,
225        timeout_ms: u64,
226    ) -> Vec<Result<HttpResponse>> {
227        use futures::stream::{self, StreamExt};
228
229        let results: Vec<Result<HttpResponse>> = stream::iter(urls.iter())
230            .map(|url| {
231                let client = self.clone();
232                let u = url.clone();
233                let t = timeout_ms;
234                async move { client.get(&u, t).await }
235            })
236            .buffer_unordered(concurrency)
237            .collect()
238            .await;
239
240        results
241    }
242
243    /// Perform parallel HEAD requests with bounded concurrency.
244    pub async fn head_many(
245        &self,
246        urls: &[String],
247        concurrency: usize,
248    ) -> Vec<Result<HeadResponse>> {
249        use futures::stream::{self, StreamExt};
250
251        let results: Vec<Result<HeadResponse>> = stream::iter(urls.iter())
252            .map(|url| {
253                let client = self.client.clone();
254                let u = url.clone();
255                async move {
256                    let resp = client
257                        .head(&u)
258                        .timeout(Duration::from_secs(10))
259                        .send()
260                        .await?;
261
262                    let status = resp.status().as_u16();
263                    let content_type = resp
264                        .headers()
265                        .get("content-type")
266                        .and_then(|v| v.to_str().ok())
267                        .map(|s| s.to_string());
268                    let content_language = resp
269                        .headers()
270                        .get("content-language")
271                        .and_then(|v| v.to_str().ok())
272                        .map(|s| s.to_string());
273                    let last_modified = resp
274                        .headers()
275                        .get("last-modified")
276                        .and_then(|v| v.to_str().ok())
277                        .map(|s| s.to_string());
278                    let cache_control = resp
279                        .headers()
280                        .get("cache-control")
281                        .and_then(|v| v.to_str().ok())
282                        .map(|s| s.to_string());
283
284                    Ok(HeadResponse {
285                        url: u,
286                        status,
287                        content_type,
288                        content_language,
289                        last_modified,
290                        cache_control,
291                    })
292                }
293            })
294            .buffer_unordered(concurrency)
295            .collect()
296            .await;
297
298        results
299    }
300}
301
302#[cfg(test)]
303mod tests {
304    use super::*;
305
306    #[test]
307    fn test_http_client_creation() {
308        let client = HttpClient::new(10000);
309        // Just verify it doesn't panic
310        let _ = client;
311    }
312
313    #[test]
314    fn test_head_response_defaults() {
315        let resp = HeadResponse {
316            url: "https://example.com".to_string(),
317            status: 200,
318            content_type: Some("text/html".to_string()),
319            content_language: None,
320            last_modified: None,
321            cache_control: None,
322        };
323        assert_eq!(resp.status, 200);
324    }
325}