1use crate::Downloader;
8use async_trait::async_trait;
9use log::{Level, debug, log_enabled, warn};
10use moka::sync::Cache;
11use reqwest::header::{ACCEPT, ACCEPT_LANGUAGE, HeaderMap, HeaderValue, USER_AGENT};
12use reqwest::{Client, Proxy};
13use spider_util::error::SpiderError;
14use spider_util::request::{Body, Request};
15use spider_util::response::Response;
16use std::time::Duration;
17
18pub struct ReqwestClientDownloader {
20 client: Client,
21 timeout: Duration,
22 browser_like_headers: bool,
23 proxy_clients: Cache<String, Client>,
25}
26
27#[async_trait]
28impl Downloader for ReqwestClientDownloader {
29 type Client = Client;
30
31 fn client(&self) -> &Self::Client {
33 &self.client
34 }
35
36 async fn download(&self, request: Request) -> Result<Response, SpiderError> {
37 if log_enabled!(Level::Debug) {
38 debug!(
39 "Downloading {} (fingerprint: {})",
40 request.url,
41 request.fingerprint()
42 );
43 }
44
45 let client_to_use = self.select_client_for_request(&request);
46 let mut request = request;
47 let meta = request.take_meta();
48 let Request {
49 url: request_url,
50 priority: request_priority,
51 method,
52 mut headers,
53 body,
54 ..
55 } = request;
56
57 self.apply_default_headers(&mut headers);
58
59 let mut req_builder = client_to_use.request(method.into(), request_url.clone());
60
61 if let Some(body_content) = body {
62 req_builder = match body_content {
63 Body::Json(json_val) => req_builder.json(&json_val),
64 Body::Form(form_val) => req_builder.form(&Self::form_pairs(&form_val)),
65 Body::Bytes(bytes_val) => req_builder.body(bytes_val),
66 };
67 }
68
69 let res = req_builder.headers(headers).send().await?;
70
71 let response_url = res.url().clone();
72 let status = res.status();
73 let response_headers = res.headers().clone();
74 let response_body = res.bytes().await?;
75
76 Ok(Response {
77 url: response_url,
78 status,
79 headers: response_headers,
80 body: response_body,
81 request_url,
82 request_priority,
83 meta,
84 cached: false,
85 })
86 }
87}
88
89impl ReqwestClientDownloader {
90 const PROXY_CLIENT_CACHE_MAX_CAPACITY: u64 = 512;
91 const PROXY_CLIENT_CACHE_TTL_SECS: u64 = 30 * 60;
92 const PROXY_META_KEY: &str = "proxy";
93 const DEFAULT_USER_AGENT: &'static str = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/133.0.0.0 Safari/537.36";
94 const DEFAULT_ACCEPT: &'static str =
95 "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8";
96 const DEFAULT_ACCEPT_LANGUAGE: &'static str = "en-US,en;q=0.9";
97
98 pub fn new() -> Self {
100 Self::new_with_timeout(Duration::from_secs(30))
101 }
102
103 pub fn new_with_timeout(timeout: Duration) -> Self {
105 match Self::try_new_with_timeout(timeout) {
106 Ok(downloader) => downloader,
107 Err(err) => panic!(
108 "failed to create reqwest downloader with timeout {:?}: {}",
109 timeout, err
110 ),
111 }
112 }
113
114 pub fn try_new_with_timeout(timeout: Duration) -> Result<Self, SpiderError> {
116 let base_client = Self::build_client(
117 timeout,
118 None,
119 512,
120 Duration::from_secs(120),
121 Duration::from_secs(60),
122 Duration::from_secs(10),
123 )?;
124
125 Ok(ReqwestClientDownloader {
126 client: base_client,
127 timeout,
128 browser_like_headers: true,
129 proxy_clients: Cache::builder()
130 .max_capacity(Self::PROXY_CLIENT_CACHE_MAX_CAPACITY)
131 .time_to_idle(Duration::from_secs(Self::PROXY_CLIENT_CACHE_TTL_SECS))
132 .build(),
133 })
134 }
135
136 pub fn with_browser_like_headers(mut self, enabled: bool) -> Self {
138 self.browser_like_headers = enabled;
139 self
140 }
141
142 fn proxy_from_request(request: &Request) -> Option<String> {
143 request.meta_inner().as_ref().and_then(|meta_map| {
144 meta_map
145 .get(Self::PROXY_META_KEY)
146 .and_then(|proxy_val| proxy_val.as_str().map(str::to_owned))
147 })
148 }
149
150 fn form_pairs(form: &dashmap::DashMap<String, String>) -> Vec<(String, String)> {
151 let mut pairs = Vec::with_capacity(form.len());
152 for entry in form.iter() {
153 pairs.push((entry.key().clone(), entry.value().clone()));
154 }
155 pairs
156 }
157
158 fn build_client(
159 timeout: Duration,
160 proxy: Option<Proxy>,
161 pool_max_idle_per_host: usize,
162 pool_idle_timeout: Duration,
163 tcp_keepalive: Duration,
164 connect_timeout: Duration,
165 ) -> Result<Client, SpiderError> {
166 let mut builder = Client::builder()
167 .timeout(timeout)
168 .pool_max_idle_per_host(pool_max_idle_per_host)
169 .pool_idle_timeout(pool_idle_timeout)
170 .tcp_keepalive(tcp_keepalive)
171 .tcp_nodelay(true)
172 .connect_timeout(connect_timeout);
173
174 if let Some(proxy) = proxy {
175 builder = builder.proxy(proxy);
176 }
177
178 builder
179 .build()
180 .map_err(|err| SpiderError::ReqwestError(err.into()))
181 }
182
183 fn select_client_for_request(&self, request: &Request) -> Client {
184 if let Some(proxy_url) = Self::proxy_from_request(request)
185 && let Some(proxy_client) = self.get_or_create_proxy_client(&proxy_url)
186 {
187 return proxy_client;
188 }
189
190 self.client.clone()
191 }
192
193 fn apply_default_headers(&self, headers: &mut HeaderMap) {
194 if !self.browser_like_headers {
195 return;
196 }
197
198 insert_if_missing(headers, USER_AGENT, Self::DEFAULT_USER_AGENT);
199 insert_if_missing(headers, ACCEPT, Self::DEFAULT_ACCEPT);
200 insert_if_missing(headers, ACCEPT_LANGUAGE, Self::DEFAULT_ACCEPT_LANGUAGE);
201 }
202
203 fn get_or_create_proxy_client(&self, proxy_url: &str) -> Option<Client> {
205 if let Some(client) = self.proxy_clients.get(proxy_url) {
206 return Some(client);
207 }
208
209 let proxy = match Proxy::all(proxy_url) {
210 Ok(proxy) => proxy,
211 Err(err) => {
212 warn!(
213 "Invalid proxy URL '{}': {}. Falling back to base client",
214 proxy_url, err
215 );
216 return None;
217 }
218 };
219
220 let proxy_client = match Self::build_client(
221 self.timeout,
222 Some(proxy),
223 128,
224 Duration::from_secs(90),
225 Duration::from_secs(30),
226 Duration::from_secs(5),
227 ) {
228 Ok(client) => client,
229 Err(err) => {
230 warn!(
231 "Failed to build client for proxy '{}': {}. Falling back to base client",
232 proxy_url, err
233 );
234 return None;
235 }
236 };
237
238 if let Some(client) = self.proxy_clients.get(proxy_url) {
239 return Some(client);
240 }
241 self.proxy_clients
242 .insert(proxy_url.to_string(), proxy_client.clone());
243 Some(proxy_client)
244 }
245}
246
247impl Default for ReqwestClientDownloader {
248 fn default() -> Self {
249 Self::new()
250 }
251}
252
253fn insert_if_missing(
254 headers: &mut HeaderMap,
255 name: reqwest::header::HeaderName,
256 value: &'static str,
257) {
258 if headers.contains_key(&name) {
259 return;
260 }
261
262 headers.insert(name, HeaderValue::from_static(value));
263}