spider_downloader/
client.rs1use crate::Downloader;
13use async_trait::async_trait;
14use log::{debug, warn};
15use moka::sync::Cache;
16use reqwest::{Client, Proxy};
17use spider_util::error::SpiderError;
18use spider_util::request::{Body, Request};
19use spider_util::response::Response;
20use std::time::Duration;
21
22pub struct ReqwestClientDownloader {
24 client: Client,
25 timeout: Duration,
26 proxy_clients: Cache<String, Client>,
28}
29
30#[async_trait]
31impl Downloader for ReqwestClientDownloader {
32 type Client = Client;
33
34 fn client(&self) -> &Self::Client {
36 &self.client
37 }
38
39 async fn download(&self, request: Request) -> Result<Response, SpiderError> {
40 debug!(
41 "Downloading {} (fingerprint: {})",
42 request.url,
43 request.fingerprint()
44 );
45
46 let url = request.url.clone();
47 let body = request.body.clone();
48 let client_to_use = self.select_client_for_request(&request);
49
50 let mut req_builder = client_to_use.request(request.method.clone(), url.clone());
51
52 if let Some(body_content) = body {
53 req_builder = match body_content {
54 Body::Json(json_val) => req_builder.json(&json_val),
55 Body::Form(form_val) => {
56 let mut form_map = std::collections::HashMap::new();
57 for entry in form_val.iter() {
58 form_map.insert(entry.key().clone(), entry.value().clone());
59 }
60 req_builder.form(&form_map)
61 }
62 Body::Bytes(bytes_val) => req_builder.body(bytes_val),
63 };
64 }
65
66 let res = req_builder.headers(request.headers.clone()).send().await?;
67
68 let response_url = res.url().clone();
69 let status = res.status();
70 let response_headers = res.headers().clone();
71 let response_body = res.bytes().await?;
72
73 Ok(Response {
74 url: response_url,
75 status,
76 headers: response_headers,
77 body: response_body,
78 request_url: url,
79 meta: request.meta_inner().clone(),
80 cached: false,
81 })
82 }
83}
84
85impl ReqwestClientDownloader {
86 const PROXY_CLIENT_CACHE_MAX_CAPACITY: u64 = 512;
87 const PROXY_CLIENT_CACHE_TTL_SECS: u64 = 30 * 60;
88 const PROXY_META_KEY: &str = "proxy";
89
90 pub fn new() -> Self {
92 Self::new_with_timeout(Duration::from_secs(30))
93 }
94
95 pub fn new_with_timeout(timeout: Duration) -> Self {
97 match Self::try_new_with_timeout(timeout) {
98 Ok(downloader) => downloader,
99 Err(err) => panic!(
100 "failed to create reqwest downloader with timeout {:?}: {}",
101 timeout, err
102 ),
103 }
104 }
105
106 pub fn try_new_with_timeout(timeout: Duration) -> Result<Self, SpiderError> {
108 let base_client = Client::builder()
109 .timeout(timeout)
110 .pool_max_idle_per_host(200)
111 .pool_idle_timeout(Duration::from_secs(120))
112 .tcp_keepalive(Duration::from_secs(60))
113 .connect_timeout(Duration::from_secs(10))
114 .build()
115 .map_err(|e| SpiderError::ReqwestError(e.into()))?;
116
117 Ok(ReqwestClientDownloader {
118 client: base_client.clone(),
119 timeout,
120 proxy_clients: Cache::builder()
121 .max_capacity(Self::PROXY_CLIENT_CACHE_MAX_CAPACITY)
122 .time_to_idle(Duration::from_secs(Self::PROXY_CLIENT_CACHE_TTL_SECS))
123 .build(),
124 })
125 }
126
127 fn proxy_from_request(request: &Request) -> Option<String> {
128 request.meta_inner().as_ref().and_then(|meta_map| {
129 meta_map
130 .get(Self::PROXY_META_KEY)
131 .and_then(|proxy_val| proxy_val.as_str().map(str::to_owned))
132 })
133 }
134
135 fn select_client_for_request(&self, request: &Request) -> Client {
136 if let Some(proxy_url) = Self::proxy_from_request(request)
137 && let Some(proxy_client) = self.get_or_create_proxy_client(&proxy_url)
138 {
139 return proxy_client;
140 }
141
142 self.client.clone()
143 }
144
145 fn get_or_create_proxy_client(&self, proxy_url: &str) -> Option<Client> {
147 if let Some(client) = self.proxy_clients.get(proxy_url) {
148 return Some(client);
149 }
150
151 let proxy = match Proxy::all(proxy_url) {
152 Ok(proxy) => proxy,
153 Err(err) => {
154 warn!(
155 "Invalid proxy URL '{}': {}. Falling back to base client",
156 proxy_url, err
157 );
158 return None;
159 }
160 };
161
162 let proxy_client = match Client::builder()
163 .timeout(self.timeout)
164 .pool_max_idle_per_host(50)
165 .pool_idle_timeout(Duration::from_secs(90))
166 .tcp_keepalive(Duration::from_secs(30))
167 .connect_timeout(Duration::from_secs(5))
168 .proxy(proxy)
169 .build()
170 {
171 Ok(client) => client,
172 Err(err) => {
173 warn!(
174 "Failed to build client for proxy '{}': {}. Falling back to base client",
175 proxy_url, err
176 );
177 return None;
178 }
179 };
180
181 if let Some(client) = self.proxy_clients.get(proxy_url) {
182 return Some(client);
183 }
184 self.proxy_clients
185 .insert(proxy_url.to_string(), proxy_client.clone());
186 Some(proxy_client)
187 }
188}
189
190impl Default for ReqwestClientDownloader {
191 fn default() -> Self {
192 Self::new()
193 }
194}
195
196#[cfg(test)]
197mod tests {
198 use super::ReqwestClientDownloader;
199 use spider_util::request::Request;
200
201 #[test]
202 fn proxy_meta_parsing_returns_none_when_missing() {
203 let request = Request::new(reqwest::Url::parse("https://example.com").expect("valid url"));
204 assert_eq!(ReqwestClientDownloader::proxy_from_request(&request), None);
205 }
206
207 #[test]
208 fn invalid_proxy_falls_back_without_error() {
209 let downloader = ReqwestClientDownloader::new();
210 let proxy_client = downloader.get_or_create_proxy_client("://invalid-proxy");
211 assert!(proxy_client.is_none());
212 assert_eq!(downloader.proxy_clients.entry_count(), 0);
213 }
214
215 #[test]
216 fn valid_proxy_client_is_cached_and_reused() {
217 let downloader = ReqwestClientDownloader::new();
218 let proxy = "http://127.0.0.1:8080";
219
220 let first = downloader.get_or_create_proxy_client(proxy);
221 assert!(first.is_some());
222 assert!(downloader.proxy_clients.get(proxy).is_some());
223
224 let second = downloader.get_or_create_proxy_client(proxy);
225 assert!(second.is_some());
226 assert!(downloader.proxy_clients.get(proxy).is_some());
227 }
228
229 #[test]
230 fn request_without_proxy_uses_base_client_path() {
231 let downloader = ReqwestClientDownloader::new();
232 let request = Request::new(reqwest::Url::parse("https://example.com").expect("valid url"));
233
234 let _ = downloader.select_client_for_request(&request);
235 assert_eq!(downloader.proxy_clients.entry_count(), 0);
236 }
237}