Skip to main content

spider_downloader/
client.rs

1//! Reqwest-backed downloader implementation.
2//!
3//! [`ReqwestClientDownloader`] is the default downloader used throughout the
4//! workspace. It maps runtime requests into reqwest calls and converts the
5//! result back into the shared response type.
6
7use crate::Downloader;
8use async_trait::async_trait;
9use log::{Level, debug, log_enabled, warn};
10use moka::sync::Cache;
11use reqwest::header::{ACCEPT, ACCEPT_LANGUAGE, HeaderMap, HeaderValue, USER_AGENT};
12use reqwest::{Client, Proxy};
13use spider_util::error::SpiderError;
14use spider_util::request::{Body, Request};
15use spider_util::response::Response;
16use std::time::Duration;
17
18/// Downloader implementation backed by `reqwest::Client`.
19pub struct ReqwestClientDownloader {
20    client: Client,
21    timeout: Duration,
22    browser_like_headers: bool,
23    /// Per-proxy clients with TTL/capacity bounds to avoid unbounded growth.
24    proxy_clients: Cache<String, Client>,
25}
26
27#[async_trait]
28impl Downloader for ReqwestClientDownloader {
29    type Client = Client;
30
31    /// Returns a reference to the underlying HTTP client.
32    fn client(&self) -> &Self::Client {
33        &self.client
34    }
35
36    async fn download(&self, request: Request) -> Result<Response, SpiderError> {
37        if log_enabled!(Level::Debug) {
38            debug!(
39                "Downloading {} (fingerprint: {})",
40                request.url,
41                request.fingerprint()
42            );
43        }
44
45        let client_to_use = self.select_client_for_request(&request);
46        let mut request = request;
47        let meta = request.take_meta();
48        let Request {
49            url: request_url,
50            priority: request_priority,
51            method,
52            mut headers,
53            body,
54            ..
55        } = request;
56
57        self.apply_default_headers(&mut headers);
58
59        let mut req_builder = client_to_use.request(method.into(), request_url.clone());
60
61        if let Some(body_content) = body {
62            req_builder = match body_content {
63                Body::Json(json_val) => req_builder.json(&json_val),
64                Body::Form(form_val) => req_builder.form(&Self::form_pairs(&form_val)),
65                Body::Bytes(bytes_val) => req_builder.body(bytes_val),
66            };
67        }
68
69        let res = req_builder.headers(headers).send().await?;
70
71        let response_url = res.url().clone();
72        let status = res.status();
73        let response_headers = res.headers().clone();
74        let response_body = res.bytes().await?;
75
76        Ok(Response {
77            url: response_url,
78            status,
79            headers: response_headers,
80            body: response_body,
81            request_url,
82            request_priority,
83            meta,
84            cached: false,
85        })
86    }
87}
88
89impl ReqwestClientDownloader {
90    const PROXY_CLIENT_CACHE_MAX_CAPACITY: u64 = 512;
91    const PROXY_CLIENT_CACHE_TTL_SECS: u64 = 30 * 60;
92    const PROXY_META_KEY: &str = "proxy";
93    const DEFAULT_USER_AGENT: &'static str = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/133.0.0.0 Safari/537.36";
94    const DEFAULT_ACCEPT: &'static str =
95        "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8";
96    const DEFAULT_ACCEPT_LANGUAGE: &'static str = "en-US,en;q=0.9";
97
98    /// Creates a new `ReqwestClientDownloader` with a default timeout of 30 seconds.
99    pub fn new() -> Self {
100        Self::new_with_timeout(Duration::from_secs(30))
101    }
102
103    /// Creates a new `ReqwestClientDownloader` with a specified request timeout.
104    pub fn new_with_timeout(timeout: Duration) -> Self {
105        match Self::try_new_with_timeout(timeout) {
106            Ok(downloader) => downloader,
107            Err(err) => panic!(
108                "failed to create reqwest downloader with timeout {:?}: {}",
109                timeout, err
110            ),
111        }
112    }
113
114    /// Tries to create a new `ReqwestClientDownloader` with a specified request timeout.
115    pub fn try_new_with_timeout(timeout: Duration) -> Result<Self, SpiderError> {
116        let base_client = Self::build_client(
117            timeout,
118            None,
119            512,
120            Duration::from_secs(120),
121            Duration::from_secs(60),
122            Duration::from_secs(10),
123        )?;
124
125        Ok(ReqwestClientDownloader {
126            client: base_client,
127            timeout,
128            browser_like_headers: true,
129            proxy_clients: Cache::builder()
130                .max_capacity(Self::PROXY_CLIENT_CACHE_MAX_CAPACITY)
131                .time_to_idle(Duration::from_secs(Self::PROXY_CLIENT_CACHE_TTL_SECS))
132                .build(),
133        })
134    }
135
136    /// Enables or disables balanced browser-like default headers when request headers are missing.
137    pub fn with_browser_like_headers(mut self, enabled: bool) -> Self {
138        self.browser_like_headers = enabled;
139        self
140    }
141
142    fn proxy_from_request(request: &Request) -> Option<String> {
143        request.meta_inner().as_ref().and_then(|meta_map| {
144            meta_map
145                .get(Self::PROXY_META_KEY)
146                .and_then(|proxy_val| proxy_val.as_str().map(str::to_owned))
147        })
148    }
149
150    fn form_pairs(form: &dashmap::DashMap<String, String>) -> Vec<(String, String)> {
151        let mut pairs = Vec::with_capacity(form.len());
152        for entry in form.iter() {
153            pairs.push((entry.key().clone(), entry.value().clone()));
154        }
155        pairs
156    }
157
158    fn build_client(
159        timeout: Duration,
160        proxy: Option<Proxy>,
161        pool_max_idle_per_host: usize,
162        pool_idle_timeout: Duration,
163        tcp_keepalive: Duration,
164        connect_timeout: Duration,
165    ) -> Result<Client, SpiderError> {
166        let mut builder = Client::builder()
167            .timeout(timeout)
168            .pool_max_idle_per_host(pool_max_idle_per_host)
169            .pool_idle_timeout(pool_idle_timeout)
170            .tcp_keepalive(tcp_keepalive)
171            .tcp_nodelay(true)
172            .connect_timeout(connect_timeout);
173
174        if let Some(proxy) = proxy {
175            builder = builder.proxy(proxy);
176        }
177
178        builder
179            .build()
180            .map_err(|err| SpiderError::ReqwestError(err.into()))
181    }
182
183    fn select_client_for_request(&self, request: &Request) -> Client {
184        if let Some(proxy_url) = Self::proxy_from_request(request)
185            && let Some(proxy_client) = self.get_or_create_proxy_client(&proxy_url)
186        {
187            return proxy_client;
188        }
189
190        self.client.clone()
191    }
192
193    fn apply_default_headers(&self, headers: &mut HeaderMap) {
194        if !self.browser_like_headers {
195            return;
196        }
197
198        insert_if_missing(headers, USER_AGENT, Self::DEFAULT_USER_AGENT);
199        insert_if_missing(headers, ACCEPT, Self::DEFAULT_ACCEPT);
200        insert_if_missing(headers, ACCEPT_LANGUAGE, Self::DEFAULT_ACCEPT_LANGUAGE);
201    }
202
203    /// Gets or creates a proxy-specific client to preserve connection pooling per proxy endpoint.
204    fn get_or_create_proxy_client(&self, proxy_url: &str) -> Option<Client> {
205        if let Some(client) = self.proxy_clients.get(proxy_url) {
206            return Some(client);
207        }
208
209        let proxy = match Proxy::all(proxy_url) {
210            Ok(proxy) => proxy,
211            Err(err) => {
212                warn!(
213                    "Invalid proxy URL '{}': {}. Falling back to base client",
214                    proxy_url, err
215                );
216                return None;
217            }
218        };
219
220        let proxy_client = match Self::build_client(
221            self.timeout,
222            Some(proxy),
223            128,
224            Duration::from_secs(90),
225            Duration::from_secs(30),
226            Duration::from_secs(5),
227        ) {
228            Ok(client) => client,
229            Err(err) => {
230                warn!(
231                    "Failed to build client for proxy '{}': {}. Falling back to base client",
232                    proxy_url, err
233                );
234                return None;
235            }
236        };
237
238        if let Some(client) = self.proxy_clients.get(proxy_url) {
239            return Some(client);
240        }
241        self.proxy_clients
242            .insert(proxy_url.to_string(), proxy_client.clone());
243        Some(proxy_client)
244    }
245}
246
247impl Default for ReqwestClientDownloader {
248    fn default() -> Self {
249        Self::new()
250    }
251}
252
253fn insert_if_missing(
254    headers: &mut HeaderMap,
255    name: reqwest::header::HeaderName,
256    value: &'static str,
257) {
258    if headers.contains_key(&name) {
259        return;
260    }
261
262    headers.insert(name, HeaderValue::from_static(value));
263}