Skip to main content

spider_downloader/
client.rs

1//! Reqwest-based Downloader implementation for the `spider-lib` framework.
2//!
3//! This module provides `ReqwestClientDownloader`, a concrete implementation
4//! of the `Downloader` trait that leverages the `reqwest` HTTP client library.
5//! It is responsible for executing HTTP requests defined by `Request` objects
6//! and converting the received HTTP responses into `Response` objects suitable
7//! for further processing by the crawler.
8//!
9//! This downloader handles various HTTP methods, request bodies (JSON, form data, bytes),
10//! and integrates with the framework's error handling.
11
12use crate::Downloader;
13use async_trait::async_trait;
14use dashmap::DashMap;
15use log::{debug, warn};
16use moka::sync::Cache;
17use reqwest::{Client, Proxy};
18use spider_util::error::SpiderError;
19use spider_util::request::{Body, Request};
20use spider_util::response::Response;
21use std::sync::Arc;
22use std::time::Duration;
23
24/// Concrete implementation of Downloader using reqwest client
25pub struct ReqwestClientDownloader {
26    client: Client,
27    timeout: Duration,
28    /// Per-host connection pools for better resource management
29    host_clients: Arc<DashMap<String, Client>>,
30    /// Per-proxy clients with TTL/capacity bounds to avoid unbounded growth.
31    proxy_clients: Cache<String, Client>,
32}
33
34#[async_trait]
35impl Downloader for ReqwestClientDownloader {
36    type Client = Client;
37
38    /// Returns a reference to the underlying HTTP client.
39    fn client(&self) -> &Self::Client {
40        &self.client
41    }
42
43    async fn download(&self, request: Request) -> Result<Response, SpiderError> {
44        debug!(
45            "Downloading {} (fingerprint: {})",
46            request.url,
47            request.fingerprint()
48        );
49
50        let url = request.url.clone();
51        let body = request.body.clone();
52
53        // Get host-specific client if available, otherwise use default
54        let host = url.host_str().unwrap_or("").to_string();
55        let mut client_to_use = self.get_or_create_host_client(&host).await;
56
57        // Check for proxy in metadata
58        if let Some(meta_map) = request.meta_inner().as_ref()
59            && let Some(proxy_val) = meta_map.get("proxy")
60            && let Some(proxy_str) = proxy_val.as_str()
61        {
62            client_to_use = self.get_or_create_proxy_client(proxy_str).await?;
63        }
64
65        let mut req_builder = client_to_use.request(request.method.clone(), url.clone());
66
67        if let Some(body_content) = body {
68            req_builder = match body_content {
69                Body::Json(json_val) => req_builder.json(&json_val),
70                Body::Form(form_val) => {
71                    let mut form_map = std::collections::HashMap::new();
72                    for entry in form_val.iter() {
73                        form_map.insert(entry.key().clone(), entry.value().clone());
74                    }
75                    req_builder.form(&form_map)
76                }
77                Body::Bytes(bytes_val) => req_builder.body(bytes_val),
78            };
79        }
80
81        let res = req_builder.headers(request.headers.clone()).send().await?;
82
83        let response_url = res.url().clone();
84        let status = res.status();
85        let response_headers = res.headers().clone();
86        let response_body = res.bytes().await?;
87
88        Ok(Response {
89            url: response_url,
90            status,
91            headers: response_headers,
92            body: response_body,
93            request_url: url,
94            meta: request.meta_inner().clone(),
95            cached: false,
96        })
97    }
98}
99
100impl ReqwestClientDownloader {
101    const PROXY_CLIENT_CACHE_MAX_CAPACITY: u64 = 512;
102    const PROXY_CLIENT_CACHE_TTL_SECS: u64 = 30 * 60;
103
104    /// Creates a new `ReqwestClientDownloader` with a default timeout of 30 seconds.
105    pub fn new() -> Self {
106        Self::new_with_timeout(Duration::from_secs(30))
107    }
108
109    /// Creates a new `ReqwestClientDownloader` with a specified request timeout.
110    pub fn new_with_timeout(timeout: Duration) -> Self {
111        match Self::try_new_with_timeout(timeout) {
112            Ok(downloader) => downloader,
113            Err(err) => panic!(
114                "failed to create reqwest downloader with timeout {:?}: {}",
115                timeout, err
116            ),
117        }
118    }
119
120    /// Tries to create a new `ReqwestClientDownloader` with a specified request timeout.
121    pub fn try_new_with_timeout(timeout: Duration) -> Result<Self, SpiderError> {
122        let base_client = Client::builder()
123            .timeout(timeout)
124            .pool_max_idle_per_host(200)
125            .pool_idle_timeout(Duration::from_secs(120))
126            .tcp_keepalive(Duration::from_secs(60))
127            .connect_timeout(Duration::from_secs(10))
128            .build()
129            .map_err(|e| SpiderError::ReqwestError(e.into()))?;
130
131        Ok(ReqwestClientDownloader {
132            client: base_client.clone(),
133            timeout,
134            host_clients: Arc::new(DashMap::new()),
135            proxy_clients: Cache::builder()
136                .max_capacity(Self::PROXY_CLIENT_CACHE_MAX_CAPACITY)
137                .time_to_idle(Duration::from_secs(Self::PROXY_CLIENT_CACHE_TTL_SECS))
138                .build(),
139        })
140    }
141
142    /// Gets or creates a host-specific client with optimized settings for that host
143    async fn get_or_create_host_client(&self, host: &str) -> Client {
144        if let Some(client) = self.host_clients.get(host) {
145            return client.clone();
146        }
147
148        // Create a new client for this host with optimized settings
149        let host_specific_client = Client::builder()
150            .timeout(self.timeout)
151            .pool_max_idle_per_host(50) // Smaller pool per host to distribute connections
152            .pool_idle_timeout(Duration::from_secs(90))
153            .tcp_keepalive(Duration::from_secs(30))
154            .connect_timeout(Duration::from_secs(5))
155            .build();
156        let host_specific_client = match host_specific_client {
157            Ok(client) => client,
158            Err(err) => {
159                warn!(
160                    "Failed to build host-specific client for '{}': {}. Falling back to base client",
161                    host, err
162                );
163                return self.client.clone();
164            }
165        };
166
167        if let Some(existing) = self.host_clients.get(host) {
168            return existing.clone();
169        }
170        self.host_clients
171            .insert(host.to_string(), host_specific_client.clone());
172
173        host_specific_client
174    }
175
176    /// Gets or creates a proxy-specific client to preserve connection pooling per proxy endpoint.
177    async fn get_or_create_proxy_client(&self, proxy_url: &str) -> Result<Client, SpiderError> {
178        if let Some(client) = self.proxy_clients.get(proxy_url) {
179            return Ok(client);
180        }
181
182        let proxy = Proxy::all(proxy_url).map_err(|e| SpiderError::ReqwestError(e.into()))?;
183        let proxy_client = Client::builder()
184            .timeout(self.timeout)
185            .pool_max_idle_per_host(50)
186            .pool_idle_timeout(Duration::from_secs(90))
187            .tcp_keepalive(Duration::from_secs(30))
188            .connect_timeout(Duration::from_secs(5))
189            .proxy(proxy)
190            .build()
191            .map_err(|e| SpiderError::ReqwestError(e.into()))?;
192
193        if let Some(client) = self.proxy_clients.get(proxy_url) {
194            return Ok(client);
195        }
196        self.proxy_clients
197            .insert(proxy_url.to_string(), proxy_client.clone());
198        Ok(proxy_client)
199    }
200}
201
202impl Default for ReqwestClientDownloader {
203    fn default() -> Self {
204        Self::new()
205    }
206}