spider_downloader/
client.rs1use crate::Downloader;
8use async_trait::async_trait;
9use log::{Level, debug, log_enabled, warn};
10use moka::sync::Cache;
11use reqwest::{Client, Proxy};
12use spider_util::error::SpiderError;
13use spider_util::request::{Body, Request};
14use spider_util::response::Response;
15use std::time::Duration;
16
17pub struct ReqwestClientDownloader {
19 client: Client,
20 timeout: Duration,
21 proxy_clients: Cache<String, Client>,
23}
24
25#[async_trait]
26impl Downloader for ReqwestClientDownloader {
27 type Client = Client;
28
29 fn client(&self) -> &Self::Client {
31 &self.client
32 }
33
34 async fn download(&self, request: Request) -> Result<Response, SpiderError> {
35 if log_enabled!(Level::Debug) {
36 debug!(
37 "Downloading {} (fingerprint: {})",
38 request.url,
39 request.fingerprint()
40 );
41 }
42
43 let client_to_use = self.select_client_for_request(&request);
44 let mut request = request;
45 let meta = request.take_meta();
46 let Request {
47 url: request_url,
48 method,
49 headers,
50 body,
51 ..
52 } = request;
53
54 let mut req_builder = client_to_use.request(method, request_url.clone());
55
56 if let Some(body_content) = body {
57 req_builder = match body_content {
58 Body::Json(json_val) => req_builder.json(&json_val),
59 Body::Form(form_val) => req_builder.form(&Self::form_pairs(&form_val)),
60 Body::Bytes(bytes_val) => req_builder.body(bytes_val),
61 };
62 }
63
64 let res = req_builder.headers(headers).send().await?;
65
66 let response_url = res.url().clone();
67 let status = res.status();
68 let response_headers = res.headers().clone();
69 let response_body = res.bytes().await?;
70
71 Ok(Response {
72 url: response_url,
73 status,
74 headers: response_headers,
75 body: response_body,
76 request_url,
77 meta,
78 cached: false,
79 })
80 }
81}
82
83impl ReqwestClientDownloader {
84 const PROXY_CLIENT_CACHE_MAX_CAPACITY: u64 = 512;
85 const PROXY_CLIENT_CACHE_TTL_SECS: u64 = 30 * 60;
86 const PROXY_META_KEY: &str = "proxy";
87 const DEFAULT_USER_AGENT: &'static str = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/133.0.0.0 Safari/537.36";
88
89 pub fn new() -> Self {
91 Self::new_with_timeout(Duration::from_secs(30))
92 }
93
94 pub fn new_with_timeout(timeout: Duration) -> Self {
96 match Self::try_new_with_timeout(timeout) {
97 Ok(downloader) => downloader,
98 Err(err) => panic!(
99 "failed to create reqwest downloader with timeout {:?}: {}",
100 timeout, err
101 ),
102 }
103 }
104
105 pub fn try_new_with_timeout(timeout: Duration) -> Result<Self, SpiderError> {
107 let base_client = Self::build_client(
108 timeout,
109 None,
110 512,
111 Duration::from_secs(120),
112 Duration::from_secs(60),
113 Duration::from_secs(10),
114 )?;
115
116 Ok(ReqwestClientDownloader {
117 client: base_client,
118 timeout,
119 proxy_clients: Cache::builder()
120 .max_capacity(Self::PROXY_CLIENT_CACHE_MAX_CAPACITY)
121 .time_to_idle(Duration::from_secs(Self::PROXY_CLIENT_CACHE_TTL_SECS))
122 .build(),
123 })
124 }
125
126 fn proxy_from_request(request: &Request) -> Option<String> {
127 request.meta_inner().as_ref().and_then(|meta_map| {
128 meta_map
129 .get(Self::PROXY_META_KEY)
130 .and_then(|proxy_val| proxy_val.as_str().map(str::to_owned))
131 })
132 }
133
134 fn form_pairs(form: &dashmap::DashMap<String, String>) -> Vec<(String, String)> {
135 let mut pairs = Vec::with_capacity(form.len());
136 for entry in form.iter() {
137 pairs.push((entry.key().clone(), entry.value().clone()));
138 }
139 pairs
140 }
141
142 fn build_client(
143 timeout: Duration,
144 proxy: Option<Proxy>,
145 pool_max_idle_per_host: usize,
146 pool_idle_timeout: Duration,
147 tcp_keepalive: Duration,
148 connect_timeout: Duration,
149 ) -> Result<Client, SpiderError> {
150 let mut builder = Client::builder()
151 .timeout(timeout)
152 .pool_max_idle_per_host(pool_max_idle_per_host)
153 .pool_idle_timeout(pool_idle_timeout)
154 .tcp_keepalive(tcp_keepalive)
155 .tcp_nodelay(true)
156 .connect_timeout(connect_timeout)
157 .user_agent(Self::DEFAULT_USER_AGENT);
158
159 if let Some(proxy) = proxy {
160 builder = builder.proxy(proxy);
161 }
162
163 builder
164 .build()
165 .map_err(|err| SpiderError::ReqwestError(err.into()))
166 }
167
168 fn select_client_for_request(&self, request: &Request) -> Client {
169 if let Some(proxy_url) = Self::proxy_from_request(request)
170 && let Some(proxy_client) = self.get_or_create_proxy_client(&proxy_url)
171 {
172 return proxy_client;
173 }
174
175 self.client.clone()
176 }
177
178 fn get_or_create_proxy_client(&self, proxy_url: &str) -> Option<Client> {
180 if let Some(client) = self.proxy_clients.get(proxy_url) {
181 return Some(client);
182 }
183
184 let proxy = match Proxy::all(proxy_url) {
185 Ok(proxy) => proxy,
186 Err(err) => {
187 warn!(
188 "Invalid proxy URL '{}': {}. Falling back to base client",
189 proxy_url, err
190 );
191 return None;
192 }
193 };
194
195 let proxy_client = match Self::build_client(
196 self.timeout,
197 Some(proxy),
198 128,
199 Duration::from_secs(90),
200 Duration::from_secs(30),
201 Duration::from_secs(5),
202 ) {
203 Ok(client) => client,
204 Err(err) => {
205 warn!(
206 "Failed to build client for proxy '{}': {}. Falling back to base client",
207 proxy_url, err
208 );
209 return None;
210 }
211 };
212
213 if let Some(client) = self.proxy_clients.get(proxy_url) {
214 return Some(client);
215 }
216 self.proxy_clients
217 .insert(proxy_url.to_string(), proxy_client.clone());
218 Some(proxy_client)
219 }
220}
221
222impl Default for ReqwestClientDownloader {
223 fn default() -> Self {
224 Self::new()
225 }
226}