spider_downloader/
client.rs1use crate::Downloader;
13use async_trait::async_trait;
14use reqwest::{Client, Proxy};
15use spider_util::error::SpiderError;
16use spider_util::request::{Body, Request};
17use spider_util::response::Response;
18use std::time::Duration;
19use log::info;
20use tokio::sync::RwLock;
21use std::collections::HashMap;
22use std::sync::Arc;
23
24pub struct ReqwestClientDownloader {
26 client: Client,
27 timeout: Duration,
28 host_clients: Arc<RwLock<HashMap<String, Client>>>,
30}
31
32#[async_trait]
33impl Downloader for ReqwestClientDownloader {
34 type Client = Client;
35
36 fn client(&self) -> &Self::Client {
38 &self.client
39 }
40
41 async fn download(&self, request: Request) -> Result<Response, SpiderError> {
42 info!(
43 "Downloading {} (fingerprint: {})",
44 request.url,
45 request.fingerprint()
46 );
47
48 let Request {
49 url,
50 method,
51 headers,
52 body,
53 meta,
54 ..
55 } = request;
56
57 let host = url.host_str().unwrap_or("").to_string();
59 let meta_hashmap: std::collections::HashMap<String, serde_json::Value> =
61 meta.iter().map(|entry| (entry.key().clone().into_owned(), entry.value().clone())).collect();
62 let mut client_to_use = self.get_or_create_host_client(&host, &meta_hashmap).await;
63
64 if let Some(proxy_val) = meta.get("proxy")
65 && let Some(proxy_str) = proxy_val.as_str()
66 {
67 match Proxy::all(proxy_str) {
68 Ok(proxy) => {
69 let new_client = Client::builder()
70 .timeout(self.timeout)
71 .proxy(proxy)
72 .build()
73 .map_err(|e| SpiderError::ReqwestError(e.into()))?;
74 client_to_use = new_client;
75 }
76 Err(e) => {
77 return Err(SpiderError::ReqwestError(e.into()));
78 }
79 }
80 }
81
82 let mut req_builder = client_to_use.request(method, url.clone());
83
84 if let Some(body_content) = body {
85 req_builder = match body_content {
86 Body::Json(json_val) => req_builder.json(&json_val),
87 Body::Form(form_val) => {
88 let mut form_map = std::collections::HashMap::new();
89 for entry in form_val.iter() {
90 form_map.insert(entry.key().clone(), entry.value().clone());
91 }
92 req_builder.form(&form_map)
93 }
94 Body::Bytes(bytes_val) => req_builder.body(bytes_val),
95 };
96 }
97
98 let res = req_builder.headers(headers).send().await?;
99
100 let response_url = res.url().clone();
101 let status = res.status();
102 let response_headers = res.headers().clone();
103 let response_body = res.bytes().await?;
104
105 Ok(Response {
106 url: response_url,
107 status,
108 headers: response_headers,
109 body: response_body,
110 request_url: url,
111 meta,
112 cached: false,
113 })
114 }
115}
116
117impl ReqwestClientDownloader {
118 pub fn new() -> Self {
120 Self::new_with_timeout(Duration::from_secs(30))
121 }
122
123 pub fn new_with_timeout(timeout: Duration) -> Self {
125 let base_client = Client::builder()
126 .timeout(timeout)
127 .pool_max_idle_per_host(200)
128 .pool_idle_timeout(Duration::from_secs(120))
129 .tcp_keepalive(Duration::from_secs(60))
130 .connect_timeout(Duration::from_secs(10))
131 .build()
132 .unwrap();
133
134 ReqwestClientDownloader {
135 client: base_client.clone(),
136 timeout,
137 host_clients: Arc::new(RwLock::new(HashMap::new())),
138 }
139 }
140
141 async fn get_or_create_host_client(&self, host: &str, _meta: &std::collections::HashMap<String, serde_json::Value>) -> Client {
143 {
144 let clients = self.host_clients.read().await;
145 if let Some(client) = clients.get(host) {
146 return client.clone();
147 }
148 }
149
150 let host_specific_client = Client::builder()
152 .timeout(self.timeout)
153 .pool_max_idle_per_host(50) .pool_idle_timeout(Duration::from_secs(90))
155 .tcp_keepalive(Duration::from_secs(30))
156 .connect_timeout(Duration::from_secs(5))
157 .build()
158 .unwrap();
159
160 {
161 let mut clients = self.host_clients.write().await;
162 if let Some(client) = clients.get(host) {
164 return client.clone();
165 }
166 clients.insert(host.to_string(), host_specific_client.clone());
167 }
168
169 host_specific_client
170 }
171}
172
173impl Default for ReqwestClientDownloader {
174 fn default() -> Self {
175 Self::new()
176 }
177}