spider_downloader/
client.rs1use crate::Downloader;
13use async_trait::async_trait;
14use dashmap::DashMap;
15use log::{debug, warn};
16use moka::sync::Cache;
17use reqwest::{Client, Proxy};
18use spider_util::error::SpiderError;
19use spider_util::request::{Body, Request};
20use spider_util::response::Response;
21use std::sync::Arc;
22use std::time::Duration;
23
24pub struct ReqwestClientDownloader {
26 client: Client,
27 timeout: Duration,
28 host_clients: Arc<DashMap<String, Client>>,
30 proxy_clients: Cache<String, Client>,
32}
33
34#[async_trait]
35impl Downloader for ReqwestClientDownloader {
36 type Client = Client;
37
38 fn client(&self) -> &Self::Client {
40 &self.client
41 }
42
43 async fn download(&self, request: Request) -> Result<Response, SpiderError> {
44 debug!(
45 "Downloading {} (fingerprint: {})",
46 request.url,
47 request.fingerprint()
48 );
49
50 let url = request.url.clone();
51 let body = request.body.clone();
52
53 let host = url.host_str().unwrap_or("").to_string();
55 let mut client_to_use = self.get_or_create_host_client(&host).await;
56
57 if let Some(meta_map) = request.meta_inner().as_ref()
59 && let Some(proxy_val) = meta_map.get("proxy")
60 && let Some(proxy_str) = proxy_val.as_str()
61 {
62 client_to_use = self.get_or_create_proxy_client(proxy_str).await?;
63 }
64
65 let mut req_builder = client_to_use.request(request.method.clone(), url.clone());
66
67 if let Some(body_content) = body {
68 req_builder = match body_content {
69 Body::Json(json_val) => req_builder.json(&json_val),
70 Body::Form(form_val) => {
71 let mut form_map = std::collections::HashMap::new();
72 for entry in form_val.iter() {
73 form_map.insert(entry.key().clone(), entry.value().clone());
74 }
75 req_builder.form(&form_map)
76 }
77 Body::Bytes(bytes_val) => req_builder.body(bytes_val),
78 };
79 }
80
81 let res = req_builder.headers(request.headers.clone()).send().await?;
82
83 let response_url = res.url().clone();
84 let status = res.status();
85 let response_headers = res.headers().clone();
86 let response_body = res.bytes().await?;
87
88 Ok(Response {
89 url: response_url,
90 status,
91 headers: response_headers,
92 body: response_body,
93 request_url: url,
94 meta: request.meta_inner().clone(),
95 cached: false,
96 })
97 }
98}
99
100impl ReqwestClientDownloader {
101 const PROXY_CLIENT_CACHE_MAX_CAPACITY: u64 = 512;
102 const PROXY_CLIENT_CACHE_TTL_SECS: u64 = 30 * 60;
103
104 pub fn new() -> Self {
106 Self::new_with_timeout(Duration::from_secs(30))
107 }
108
109 pub fn new_with_timeout(timeout: Duration) -> Self {
111 match Self::try_new_with_timeout(timeout) {
112 Ok(downloader) => downloader,
113 Err(err) => panic!(
114 "failed to create reqwest downloader with timeout {:?}: {}",
115 timeout, err
116 ),
117 }
118 }
119
120 pub fn try_new_with_timeout(timeout: Duration) -> Result<Self, SpiderError> {
122 let base_client = Client::builder()
123 .timeout(timeout)
124 .pool_max_idle_per_host(200)
125 .pool_idle_timeout(Duration::from_secs(120))
126 .tcp_keepalive(Duration::from_secs(60))
127 .connect_timeout(Duration::from_secs(10))
128 .build()
129 .map_err(|e| SpiderError::ReqwestError(e.into()))?;
130
131 Ok(ReqwestClientDownloader {
132 client: base_client.clone(),
133 timeout,
134 host_clients: Arc::new(DashMap::new()),
135 proxy_clients: Cache::builder()
136 .max_capacity(Self::PROXY_CLIENT_CACHE_MAX_CAPACITY)
137 .time_to_idle(Duration::from_secs(Self::PROXY_CLIENT_CACHE_TTL_SECS))
138 .build(),
139 })
140 }
141
142 async fn get_or_create_host_client(&self, host: &str) -> Client {
144 if let Some(client) = self.host_clients.get(host) {
145 return client.clone();
146 }
147
148 let host_specific_client = Client::builder()
150 .timeout(self.timeout)
151 .pool_max_idle_per_host(50) .pool_idle_timeout(Duration::from_secs(90))
153 .tcp_keepalive(Duration::from_secs(30))
154 .connect_timeout(Duration::from_secs(5))
155 .build();
156 let host_specific_client = match host_specific_client {
157 Ok(client) => client,
158 Err(err) => {
159 warn!(
160 "Failed to build host-specific client for '{}': {}. Falling back to base client",
161 host, err
162 );
163 return self.client.clone();
164 }
165 };
166
167 if let Some(existing) = self.host_clients.get(host) {
168 return existing.clone();
169 }
170 self.host_clients
171 .insert(host.to_string(), host_specific_client.clone());
172
173 host_specific_client
174 }
175
176 async fn get_or_create_proxy_client(&self, proxy_url: &str) -> Result<Client, SpiderError> {
178 if let Some(client) = self.proxy_clients.get(proxy_url) {
179 return Ok(client);
180 }
181
182 let proxy = Proxy::all(proxy_url).map_err(|e| SpiderError::ReqwestError(e.into()))?;
183 let proxy_client = Client::builder()
184 .timeout(self.timeout)
185 .pool_max_idle_per_host(50)
186 .pool_idle_timeout(Duration::from_secs(90))
187 .tcp_keepalive(Duration::from_secs(30))
188 .connect_timeout(Duration::from_secs(5))
189 .proxy(proxy)
190 .build()
191 .map_err(|e| SpiderError::ReqwestError(e.into()))?;
192
193 if let Some(client) = self.proxy_clients.get(proxy_url) {
194 return Ok(client);
195 }
196 self.proxy_clients
197 .insert(proxy_url.to_string(), proxy_client.clone());
198 Ok(proxy_client)
199 }
200}
201
202impl Default for ReqwestClientDownloader {
203 fn default() -> Self {
204 Self::new()
205 }
206}