Skip to main content

spider_downloader/
client.rs

1//! Reqwest-based Downloader implementation for the `spider-lib` framework.
2//!
3//! This module provides `ReqwestClientDownloader`, a concrete implementation
4//! of the `Downloader` trait that leverages the `reqwest` HTTP client library.
5//! It is responsible for executing HTTP requests defined by `Request` objects
6//! and converting the received HTTP responses into `Response` objects suitable
7//! for further processing by the crawler.
8//!
9//! This downloader handles various HTTP methods, request bodies (JSON, form data, bytes),
10//! and integrates with the framework's error handling.
11
12use crate::Downloader;
13use async_trait::async_trait;
14use log::{debug, warn};
15use moka::sync::Cache;
16use reqwest::{Client, Proxy};
17use spider_util::error::SpiderError;
18use spider_util::request::{Body, Request};
19use spider_util::response::Response;
20use std::time::Duration;
21
22/// Concrete implementation of Downloader using reqwest client
23pub struct ReqwestClientDownloader {
24    client: Client,
25    timeout: Duration,
26    /// Per-proxy clients with TTL/capacity bounds to avoid unbounded growth.
27    proxy_clients: Cache<String, Client>,
28}
29
30#[async_trait]
31impl Downloader for ReqwestClientDownloader {
32    type Client = Client;
33
34    /// Returns a reference to the underlying HTTP client.
35    fn client(&self) -> &Self::Client {
36        &self.client
37    }
38
39    async fn download(&self, request: Request) -> Result<Response, SpiderError> {
40        debug!(
41            "Downloading {} (fingerprint: {})",
42            request.url,
43            request.fingerprint()
44        );
45
46        let url = request.url.clone();
47        let body = request.body.clone();
48        let client_to_use = self.select_client_for_request(&request);
49
50        let mut req_builder = client_to_use.request(request.method.clone(), url.clone());
51
52        if let Some(body_content) = body {
53            req_builder = match body_content {
54                Body::Json(json_val) => req_builder.json(&json_val),
55                Body::Form(form_val) => {
56                    let mut form_map = std::collections::HashMap::new();
57                    for entry in form_val.iter() {
58                        form_map.insert(entry.key().clone(), entry.value().clone());
59                    }
60                    req_builder.form(&form_map)
61                }
62                Body::Bytes(bytes_val) => req_builder.body(bytes_val),
63            };
64        }
65
66        let res = req_builder.headers(request.headers.clone()).send().await?;
67
68        let response_url = res.url().clone();
69        let status = res.status();
70        let response_headers = res.headers().clone();
71        let response_body = res.bytes().await?;
72
73        Ok(Response {
74            url: response_url,
75            status,
76            headers: response_headers,
77            body: response_body,
78            request_url: url,
79            meta: request.meta_inner().clone(),
80            cached: false,
81        })
82    }
83}
84
85impl ReqwestClientDownloader {
86    const PROXY_CLIENT_CACHE_MAX_CAPACITY: u64 = 512;
87    const PROXY_CLIENT_CACHE_TTL_SECS: u64 = 30 * 60;
88    const PROXY_META_KEY: &str = "proxy";
89
90    /// Creates a new `ReqwestClientDownloader` with a default timeout of 30 seconds.
91    pub fn new() -> Self {
92        Self::new_with_timeout(Duration::from_secs(30))
93    }
94
95    /// Creates a new `ReqwestClientDownloader` with a specified request timeout.
96    pub fn new_with_timeout(timeout: Duration) -> Self {
97        match Self::try_new_with_timeout(timeout) {
98            Ok(downloader) => downloader,
99            Err(err) => panic!(
100                "failed to create reqwest downloader with timeout {:?}: {}",
101                timeout, err
102            ),
103        }
104    }
105
106    /// Tries to create a new `ReqwestClientDownloader` with a specified request timeout.
107    pub fn try_new_with_timeout(timeout: Duration) -> Result<Self, SpiderError> {
108        let base_client = Client::builder()
109            .timeout(timeout)
110            .pool_max_idle_per_host(200)
111            .pool_idle_timeout(Duration::from_secs(120))
112            .tcp_keepalive(Duration::from_secs(60))
113            .connect_timeout(Duration::from_secs(10))
114            .build()
115            .map_err(|e| SpiderError::ReqwestError(e.into()))?;
116
117        Ok(ReqwestClientDownloader {
118            client: base_client.clone(),
119            timeout,
120            proxy_clients: Cache::builder()
121                .max_capacity(Self::PROXY_CLIENT_CACHE_MAX_CAPACITY)
122                .time_to_idle(Duration::from_secs(Self::PROXY_CLIENT_CACHE_TTL_SECS))
123                .build(),
124        })
125    }
126
127    fn proxy_from_request(request: &Request) -> Option<String> {
128        request.meta_inner().as_ref().and_then(|meta_map| {
129            meta_map
130                .get(Self::PROXY_META_KEY)
131                .and_then(|proxy_val| proxy_val.as_str().map(str::to_owned))
132        })
133    }
134
135    fn select_client_for_request(&self, request: &Request) -> Client {
136        if let Some(proxy_url) = Self::proxy_from_request(request)
137            && let Some(proxy_client) = self.get_or_create_proxy_client(&proxy_url)
138        {
139            return proxy_client;
140        }
141
142        self.client.clone()
143    }
144
145    /// Gets or creates a proxy-specific client to preserve connection pooling per proxy endpoint.
146    fn get_or_create_proxy_client(&self, proxy_url: &str) -> Option<Client> {
147        if let Some(client) = self.proxy_clients.get(proxy_url) {
148            return Some(client);
149        }
150
151        let proxy = match Proxy::all(proxy_url) {
152            Ok(proxy) => proxy,
153            Err(err) => {
154                warn!(
155                    "Invalid proxy URL '{}': {}. Falling back to base client",
156                    proxy_url, err
157                );
158                return None;
159            }
160        };
161
162        let proxy_client = match Client::builder()
163            .timeout(self.timeout)
164            .pool_max_idle_per_host(50)
165            .pool_idle_timeout(Duration::from_secs(90))
166            .tcp_keepalive(Duration::from_secs(30))
167            .connect_timeout(Duration::from_secs(5))
168            .proxy(proxy)
169            .build()
170        {
171            Ok(client) => client,
172            Err(err) => {
173                warn!(
174                    "Failed to build client for proxy '{}': {}. Falling back to base client",
175                    proxy_url, err
176                );
177                return None;
178            }
179        };
180
181        if let Some(client) = self.proxy_clients.get(proxy_url) {
182            return Some(client);
183        }
184        self.proxy_clients
185            .insert(proxy_url.to_string(), proxy_client.clone());
186        Some(proxy_client)
187    }
188}
189
190impl Default for ReqwestClientDownloader {
191    fn default() -> Self {
192        Self::new()
193    }
194}
195
196#[cfg(test)]
197mod tests {
198    use super::ReqwestClientDownloader;
199    use spider_util::request::Request;
200
201    #[test]
202    fn proxy_meta_parsing_returns_none_when_missing() {
203        let request = Request::new(reqwest::Url::parse("https://example.com").expect("valid url"));
204        assert_eq!(ReqwestClientDownloader::proxy_from_request(&request), None);
205    }
206
207    #[test]
208    fn invalid_proxy_falls_back_without_error() {
209        let downloader = ReqwestClientDownloader::new();
210        let proxy_client = downloader.get_or_create_proxy_client("://invalid-proxy");
211        assert!(proxy_client.is_none());
212        assert_eq!(downloader.proxy_clients.entry_count(), 0);
213    }
214
215    #[test]
216    fn valid_proxy_client_is_cached_and_reused() {
217        let downloader = ReqwestClientDownloader::new();
218        let proxy = "http://127.0.0.1:8080";
219
220        let first = downloader.get_or_create_proxy_client(proxy);
221        assert!(first.is_some());
222        assert!(downloader.proxy_clients.get(proxy).is_some());
223
224        let second = downloader.get_or_create_proxy_client(proxy);
225        assert!(second.is_some());
226        assert!(downloader.proxy_clients.get(proxy).is_some());
227    }
228
229    #[test]
230    fn request_without_proxy_uses_base_client_path() {
231        let downloader = ReqwestClientDownloader::new();
232        let request = Request::new(reqwest::Url::parse("https://example.com").expect("valid url"));
233
234        let _ = downloader.select_client_for_request(&request);
235        assert_eq!(downloader.proxy_clients.entry_count(), 0);
236    }
237}