Skip to main content

spider_downloader/
client.rs

1//! Reqwest-based Downloader implementation for the `spider-lib` framework.
2//!
3//! This module provides `ReqwestClientDownloader`, a concrete implementation
4//! of the `Downloader` trait that leverages the `reqwest` HTTP client library.
5//! It is responsible for executing HTTP requests defined by `Request` objects
6//! and converting the received HTTP responses into `Response` objects suitable
7//! for further processing by the crawler.
8//!
9//! This downloader handles various HTTP methods, request bodies (JSON, form data, bytes),
10//! and integrates with the framework's error handling.
11
12use crate::Downloader;
13use async_trait::async_trait;
14use log::info;
15use reqwest::{Client, Proxy};
16use spider_util::error::SpiderError;
17use spider_util::request::{Body, Request};
18use spider_util::response::Response;
19use std::collections::HashMap;
20use std::sync::Arc;
21use std::time::Duration;
22use tokio::sync::RwLock;
23
24/// Concrete implementation of Downloader using reqwest client
25pub struct ReqwestClientDownloader {
26    client: Client,
27    timeout: Duration,
28    /// Per-host connection pools for better resource management
29    host_clients: Arc<RwLock<HashMap<String, Client>>>,
30}
31
32#[async_trait]
33impl Downloader for ReqwestClientDownloader {
34    type Client = Client;
35
36    /// Returns a reference to the underlying HTTP client.
37    fn client(&self) -> &Self::Client {
38        &self.client
39    }
40
41    async fn download(&self, request: Request) -> Result<Response, SpiderError> {
42        info!(
43            "Downloading {} (fingerprint: {})",
44            request.url,
45            request.fingerprint()
46        );
47
48        let url = request.url.clone();
49        let body = request.body.clone();
50
51        // Get host-specific client if available, otherwise use default
52        let host = url.host_str().unwrap_or("").to_string();
53        let mut client_to_use = self.get_or_create_host_client(&host).await;
54
55        // Check for proxy in metadata
56        if let Some(meta_map) = request.meta_inner().as_ref()
57            && let Some(proxy_val) = meta_map.get("proxy")
58            && let Some(proxy_str) = proxy_val.as_str()
59        {
60            match Proxy::all(proxy_str) {
61                Ok(proxy) => {
62                    let new_client = Client::builder()
63                        .timeout(self.timeout)
64                        .proxy(proxy)
65                        .build()
66                        .map_err(|e| SpiderError::ReqwestError(e.into()))?;
67                    client_to_use = new_client;
68                }
69                Err(e) => {
70                    return Err(SpiderError::ReqwestError(e.into()));
71                }
72            }
73        }
74
75        let mut req_builder = client_to_use.request(request.method.clone(), url.clone());
76
77        if let Some(body_content) = body {
78            req_builder = match body_content {
79                Body::Json(json_val) => req_builder.json(&json_val),
80                Body::Form(form_val) => {
81                    let mut form_map = std::collections::HashMap::new();
82                    for entry in form_val.iter() {
83                        form_map.insert(entry.key().clone(), entry.value().clone());
84                    }
85                    req_builder.form(&form_map)
86                }
87                Body::Bytes(bytes_val) => req_builder.body(bytes_val),
88            };
89        }
90
91        let res = req_builder.headers(request.headers.clone()).send().await?;
92
93        let response_url = res.url().clone();
94        let status = res.status();
95        let response_headers = res.headers().clone();
96        let response_body = res.bytes().await?;
97
98        Ok(Response {
99            url: response_url,
100            status,
101            headers: response_headers,
102            body: response_body,
103            request_url: url,
104            meta: request.meta_inner().clone(),
105            cached: false,
106        })
107    }
108}
109
110impl ReqwestClientDownloader {
111    /// Creates a new `ReqwestClientDownloader` with a default timeout of 30 seconds.
112    pub fn new() -> Self {
113        Self::new_with_timeout(Duration::from_secs(30))
114    }
115
116    /// Creates a new `ReqwestClientDownloader` with a specified request timeout.
117    pub fn new_with_timeout(timeout: Duration) -> Self {
118        let base_client = Client::builder()
119            .timeout(timeout)
120            .pool_max_idle_per_host(200)
121            .pool_idle_timeout(Duration::from_secs(120))
122            .tcp_keepalive(Duration::from_secs(60))
123            .connect_timeout(Duration::from_secs(10))
124            .build()
125            .unwrap();
126
127        ReqwestClientDownloader {
128            client: base_client.clone(),
129            timeout,
130            host_clients: Arc::new(RwLock::new(HashMap::new())),
131        }
132    }
133
134    /// Gets or creates a host-specific client with optimized settings for that host
135    async fn get_or_create_host_client(&self, host: &str) -> Client {
136        {
137            let clients = self.host_clients.read().await;
138            if let Some(client) = clients.get(host) {
139                return client.clone();
140            }
141        }
142
143        // Create a new client for this host with optimized settings
144        let host_specific_client = Client::builder()
145            .timeout(self.timeout)
146            .pool_max_idle_per_host(50) // Smaller pool per host to distribute connections
147            .pool_idle_timeout(Duration::from_secs(90))
148            .tcp_keepalive(Duration::from_secs(30))
149            .connect_timeout(Duration::from_secs(5))
150            .build()
151            .unwrap();
152
153        {
154            let mut clients = self.host_clients.write().await;
155            // Double-check pattern to avoid race condition
156            if let Some(client) = clients.get(host) {
157                return client.clone();
158            }
159            clients.insert(host.to_string(), host_specific_client.clone());
160        }
161
162        host_specific_client
163    }
164}
165
166impl Default for ReqwestClientDownloader {
167    fn default() -> Self {
168        Self::new()
169    }
170}