Skip to main content

spider_downloader/
client.rs

1//! Reqwest-backed downloader implementation.
2//!
3//! [`ReqwestClientDownloader`] is the default downloader used throughout the
4//! workspace. It maps runtime requests into reqwest calls and converts the
5//! result back into the shared response type.
6
7use crate::Downloader;
8use async_trait::async_trait;
9use log::{Level, debug, log_enabled, warn};
10use moka::sync::Cache;
11use reqwest::{Client, Proxy};
12use spider_util::error::SpiderError;
13use spider_util::request::{Body, Request};
14use spider_util::response::Response;
15use std::time::Duration;
16
17/// Downloader implementation backed by `reqwest::Client`.
18pub struct ReqwestClientDownloader {
19    client: Client,
20    timeout: Duration,
21    /// Per-proxy clients with TTL/capacity bounds to avoid unbounded growth.
22    proxy_clients: Cache<String, Client>,
23}
24
25#[async_trait]
26impl Downloader for ReqwestClientDownloader {
27    type Client = Client;
28
29    /// Returns a reference to the underlying HTTP client.
30    fn client(&self) -> &Self::Client {
31        &self.client
32    }
33
34    async fn download(&self, request: Request) -> Result<Response, SpiderError> {
35        if log_enabled!(Level::Debug) {
36            debug!(
37                "Downloading {} (fingerprint: {})",
38                request.url,
39                request.fingerprint()
40            );
41        }
42
43        let client_to_use = self.select_client_for_request(&request);
44        let mut request = request;
45        let meta = request.take_meta();
46        let Request {
47            url: request_url,
48            method,
49            headers,
50            body,
51            ..
52        } = request;
53
54        let mut req_builder = client_to_use.request(method, request_url.clone());
55
56        if let Some(body_content) = body {
57            req_builder = match body_content {
58                Body::Json(json_val) => req_builder.json(&json_val),
59                Body::Form(form_val) => req_builder.form(&Self::form_pairs(&form_val)),
60                Body::Bytes(bytes_val) => req_builder.body(bytes_val),
61            };
62        }
63
64        let res = req_builder.headers(headers).send().await?;
65
66        let response_url = res.url().clone();
67        let status = res.status();
68        let response_headers = res.headers().clone();
69        let response_body = res.bytes().await?;
70
71        Ok(Response {
72            url: response_url,
73            status,
74            headers: response_headers,
75            body: response_body,
76            request_url,
77            meta,
78            cached: false,
79        })
80    }
81}
82
83impl ReqwestClientDownloader {
84    const PROXY_CLIENT_CACHE_MAX_CAPACITY: u64 = 512;
85    const PROXY_CLIENT_CACHE_TTL_SECS: u64 = 30 * 60;
86    const PROXY_META_KEY: &str = "proxy";
87    const DEFAULT_USER_AGENT: &'static str = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/133.0.0.0 Safari/537.36";
88
89    /// Creates a new `ReqwestClientDownloader` with a default timeout of 30 seconds.
90    pub fn new() -> Self {
91        Self::new_with_timeout(Duration::from_secs(30))
92    }
93
94    /// Creates a new `ReqwestClientDownloader` with a specified request timeout.
95    pub fn new_with_timeout(timeout: Duration) -> Self {
96        match Self::try_new_with_timeout(timeout) {
97            Ok(downloader) => downloader,
98            Err(err) => panic!(
99                "failed to create reqwest downloader with timeout {:?}: {}",
100                timeout, err
101            ),
102        }
103    }
104
105    /// Tries to create a new `ReqwestClientDownloader` with a specified request timeout.
106    pub fn try_new_with_timeout(timeout: Duration) -> Result<Self, SpiderError> {
107        let base_client = Self::build_client(
108            timeout,
109            None,
110            512,
111            Duration::from_secs(120),
112            Duration::from_secs(60),
113            Duration::from_secs(10),
114        )?;
115
116        Ok(ReqwestClientDownloader {
117            client: base_client,
118            timeout,
119            proxy_clients: Cache::builder()
120                .max_capacity(Self::PROXY_CLIENT_CACHE_MAX_CAPACITY)
121                .time_to_idle(Duration::from_secs(Self::PROXY_CLIENT_CACHE_TTL_SECS))
122                .build(),
123        })
124    }
125
126    fn proxy_from_request(request: &Request) -> Option<String> {
127        request.meta_inner().as_ref().and_then(|meta_map| {
128            meta_map
129                .get(Self::PROXY_META_KEY)
130                .and_then(|proxy_val| proxy_val.as_str().map(str::to_owned))
131        })
132    }
133
134    fn form_pairs(form: &dashmap::DashMap<String, String>) -> Vec<(String, String)> {
135        let mut pairs = Vec::with_capacity(form.len());
136        for entry in form.iter() {
137            pairs.push((entry.key().clone(), entry.value().clone()));
138        }
139        pairs
140    }
141
142    fn build_client(
143        timeout: Duration,
144        proxy: Option<Proxy>,
145        pool_max_idle_per_host: usize,
146        pool_idle_timeout: Duration,
147        tcp_keepalive: Duration,
148        connect_timeout: Duration,
149    ) -> Result<Client, SpiderError> {
150        let mut builder = Client::builder()
151            .timeout(timeout)
152            .pool_max_idle_per_host(pool_max_idle_per_host)
153            .pool_idle_timeout(pool_idle_timeout)
154            .tcp_keepalive(tcp_keepalive)
155            .tcp_nodelay(true)
156            .connect_timeout(connect_timeout)
157            .user_agent(Self::DEFAULT_USER_AGENT);
158
159        if let Some(proxy) = proxy {
160            builder = builder.proxy(proxy);
161        }
162
163        builder
164            .build()
165            .map_err(|err| SpiderError::ReqwestError(err.into()))
166    }
167
168    fn select_client_for_request(&self, request: &Request) -> Client {
169        if let Some(proxy_url) = Self::proxy_from_request(request)
170            && let Some(proxy_client) = self.get_or_create_proxy_client(&proxy_url)
171        {
172            return proxy_client;
173        }
174
175        self.client.clone()
176    }
177
178    /// Gets or creates a proxy-specific client to preserve connection pooling per proxy endpoint.
179    fn get_or_create_proxy_client(&self, proxy_url: &str) -> Option<Client> {
180        if let Some(client) = self.proxy_clients.get(proxy_url) {
181            return Some(client);
182        }
183
184        let proxy = match Proxy::all(proxy_url) {
185            Ok(proxy) => proxy,
186            Err(err) => {
187                warn!(
188                    "Invalid proxy URL '{}': {}. Falling back to base client",
189                    proxy_url, err
190                );
191                return None;
192            }
193        };
194
195        let proxy_client = match Self::build_client(
196            self.timeout,
197            Some(proxy),
198            128,
199            Duration::from_secs(90),
200            Duration::from_secs(30),
201            Duration::from_secs(5),
202        ) {
203            Ok(client) => client,
204            Err(err) => {
205                warn!(
206                    "Failed to build client for proxy '{}': {}. Falling back to base client",
207                    proxy_url, err
208                );
209                return None;
210            }
211        };
212
213        if let Some(client) = self.proxy_clients.get(proxy_url) {
214            return Some(client);
215        }
216        self.proxy_clients
217            .insert(proxy_url.to_string(), proxy_client.clone());
218        Some(proxy_client)
219    }
220}
221
222impl Default for ReqwestClientDownloader {
223    fn default() -> Self {
224        Self::new()
225    }
226}