spider_downloader/
reqwest_client.rs1use crate::{Downloader, SimpleHttpClient};
13use async_trait::async_trait;
14use bytes::Bytes;
15use http::StatusCode;
16use reqwest::{Client, Proxy};
17use spider_util::error::SpiderError;
18use spider_util::request::{Body, Request};
19use spider_util::response::Response;
20use std::time::Duration;
21use tracing::info;
22
23#[async_trait]
24impl SimpleHttpClient for Client {
25 async fn get_text(
26 &self,
27 url: &str,
28 timeout: Duration,
29 ) -> Result<(StatusCode, Bytes), SpiderError> {
30 let resp = self.get(url).timeout(timeout).send().await?;
31 let status = resp.status();
32 let body = resp.bytes().await?;
33 Ok((status, body))
34 }
35}
36
37pub struct ReqwestClientDownloader {
39 client: Client,
40 timeout: Duration,
41}
42
43#[async_trait]
44impl Downloader for ReqwestClientDownloader {
45 type Client = Client;
46
47 fn client(&self) -> &Self::Client {
49 &self.client
50 }
51
52 async fn download(&self, request: Request) -> Result<Response, SpiderError> {
53 info!(
54 "Downloading {} (fingerprint: {})",
55 request.url,
56 request.fingerprint()
57 );
58
59 let Request {
60 url,
61 method,
62 headers,
63 body,
64 meta,
65 ..
66 } = request;
67
68 let mut client_to_use = self.client.clone();
69
70 if let Some(proxy_val) = meta.get("proxy")
71 && let Some(proxy_str) = proxy_val.as_str()
72 {
73 match Proxy::all(proxy_str) {
74 Ok(proxy) => {
75 let new_client = Client::builder()
76 .timeout(self.timeout)
77 .proxy(proxy)
78 .build()
79 .map_err(|e| SpiderError::ReqwestError(e.into()))?;
80 client_to_use = new_client;
81 }
82 Err(e) => {
83 return Err(SpiderError::ReqwestError(e.into()));
84 }
85 }
86 }
87
88 let mut req_builder = client_to_use.request(method, url.clone());
89
90 if let Some(body_content) = body {
91 req_builder = match body_content {
92 Body::Json(json_val) => req_builder.json(&json_val),
93 Body::Form(form_val) => {
94 let mut form_map = std::collections::HashMap::new();
95 for entry in form_val.iter() {
96 form_map.insert(entry.key().clone(), entry.value().clone());
97 }
98 req_builder.form(&form_map)
99 }
100 Body::Bytes(bytes_val) => req_builder.body(bytes_val),
101 };
102 }
103
104 let res = req_builder.headers(headers).send().await?;
105
106 let response_url = res.url().clone();
107 let status = res.status();
108 let response_headers = res.headers().clone();
109 let response_body = res.bytes().await?;
110
111 Ok(Response {
112 url: response_url,
113 status,
114 headers: response_headers,
115 body: response_body,
116 request_url: url,
117 meta,
118 cached: false,
119 })
120 }
121}
122
123impl ReqwestClientDownloader {
124 pub fn new() -> Self {
126 Self::new_with_timeout(Duration::from_secs(30))
127 }
128
129 pub fn new_with_timeout(timeout: Duration) -> Self {
131 ReqwestClientDownloader {
132 client: Client::builder()
133 .timeout(timeout)
134 .pool_max_idle_per_host(200)
135 .pool_idle_timeout(Duration::from_secs(120))
136 .tcp_keepalive(Duration::from_secs(60))
137 .connect_timeout(Duration::from_secs(10))
138 .build()
139 .unwrap(),
140 timeout,
141 }
142 }
143}
144
145impl Default for ReqwestClientDownloader {
146 fn default() -> Self {
147 Self::new()
148 }
149}