spider_downloader/
client.rs1use crate::Downloader;
13use async_trait::async_trait;
14use log::info;
15use reqwest::{Client, Proxy};
16use spider_util::error::SpiderError;
17use spider_util::request::{Body, Request};
18use spider_util::response::Response;
19use std::collections::HashMap;
20use std::sync::Arc;
21use std::time::Duration;
22use tokio::sync::RwLock;
23
24pub struct ReqwestClientDownloader {
26 client: Client,
27 timeout: Duration,
28 host_clients: Arc<RwLock<HashMap<String, Client>>>,
30}
31
32#[async_trait]
33impl Downloader for ReqwestClientDownloader {
34 type Client = Client;
35
36 fn client(&self) -> &Self::Client {
38 &self.client
39 }
40
41 async fn download(&self, request: Request) -> Result<Response, SpiderError> {
42 info!(
43 "Downloading {} (fingerprint: {})",
44 request.url,
45 request.fingerprint()
46 );
47
48 let url = request.url.clone();
49 let body = request.body.clone();
50
51 let host = url.host_str().unwrap_or("").to_string();
53 let mut client_to_use = self.get_or_create_host_client(&host).await;
54
55 if let Some(meta_map) = request.meta_inner().as_ref()
57 && let Some(proxy_val) = meta_map.get("proxy")
58 && let Some(proxy_str) = proxy_val.as_str()
59 {
60 match Proxy::all(proxy_str) {
61 Ok(proxy) => {
62 let new_client = Client::builder()
63 .timeout(self.timeout)
64 .proxy(proxy)
65 .build()
66 .map_err(|e| SpiderError::ReqwestError(e.into()))?;
67 client_to_use = new_client;
68 }
69 Err(e) => {
70 return Err(SpiderError::ReqwestError(e.into()));
71 }
72 }
73 }
74
75 let mut req_builder = client_to_use.request(request.method.clone(), url.clone());
76
77 if let Some(body_content) = body {
78 req_builder = match body_content {
79 Body::Json(json_val) => req_builder.json(&json_val),
80 Body::Form(form_val) => {
81 let mut form_map = std::collections::HashMap::new();
82 for entry in form_val.iter() {
83 form_map.insert(entry.key().clone(), entry.value().clone());
84 }
85 req_builder.form(&form_map)
86 }
87 Body::Bytes(bytes_val) => req_builder.body(bytes_val),
88 };
89 }
90
91 let res = req_builder.headers(request.headers.clone()).send().await?;
92
93 let response_url = res.url().clone();
94 let status = res.status();
95 let response_headers = res.headers().clone();
96 let response_body = res.bytes().await?;
97
98 Ok(Response {
99 url: response_url,
100 status,
101 headers: response_headers,
102 body: response_body,
103 request_url: url,
104 meta: request.meta_inner().clone(),
105 cached: false,
106 })
107 }
108}
109
110impl ReqwestClientDownloader {
111 pub fn new() -> Self {
113 Self::new_with_timeout(Duration::from_secs(30))
114 }
115
116 pub fn new_with_timeout(timeout: Duration) -> Self {
118 let base_client = Client::builder()
119 .timeout(timeout)
120 .pool_max_idle_per_host(200)
121 .pool_idle_timeout(Duration::from_secs(120))
122 .tcp_keepalive(Duration::from_secs(60))
123 .connect_timeout(Duration::from_secs(10))
124 .build()
125 .unwrap();
126
127 ReqwestClientDownloader {
128 client: base_client.clone(),
129 timeout,
130 host_clients: Arc::new(RwLock::new(HashMap::new())),
131 }
132 }
133
134 async fn get_or_create_host_client(&self, host: &str) -> Client {
136 {
137 let clients = self.host_clients.read().await;
138 if let Some(client) = clients.get(host) {
139 return client.clone();
140 }
141 }
142
143 let host_specific_client = Client::builder()
145 .timeout(self.timeout)
146 .pool_max_idle_per_host(50) .pool_idle_timeout(Duration::from_secs(90))
148 .tcp_keepalive(Duration::from_secs(30))
149 .connect_timeout(Duration::from_secs(5))
150 .build()
151 .unwrap();
152
153 {
154 let mut clients = self.host_clients.write().await;
155 if let Some(client) = clients.get(host) {
157 return client.clone();
158 }
159 clients.insert(host.to_string(), host_specific_client.clone());
160 }
161
162 host_specific_client
163 }
164}
165
166impl Default for ReqwestClientDownloader {
167 fn default() -> Self {
168 Self::new()
169 }
170}