Skip to main content

spider_downloader/
client.rs

1//! Reqwest-based Downloader implementation for the `spider-lib` framework.
2//!
3//! This module provides `ReqwestClientDownloader`, a concrete implementation
4//! of the `Downloader` trait that leverages the `reqwest` HTTP client library.
5//! It is responsible for executing HTTP requests defined by `Request` objects
6//! and converting the received HTTP responses into `Response` objects suitable
7//! for further processing by the crawler.
8//!
9//! This downloader handles various HTTP methods, request bodies (JSON, form data, bytes),
10//! and integrates with the framework's error handling.
11
12use crate::Downloader;
13use async_trait::async_trait;
14use reqwest::{Client, Proxy};
15use spider_util::error::SpiderError;
16use spider_util::request::{Body, Request};
17use spider_util::response::Response;
18use std::time::Duration;
19use log::info;
20use tokio::sync::RwLock;
21use std::collections::HashMap;
22use std::sync::Arc;
23
24/// Concrete implementation of Downloader using reqwest client
25pub struct ReqwestClientDownloader {
26    client: Client,
27    timeout: Duration,
28    /// Per-host connection pools for better resource management
29    host_clients: Arc<RwLock<HashMap<String, Client>>>,
30}
31
32#[async_trait]
33impl Downloader for ReqwestClientDownloader {
34    type Client = Client;
35
36    /// Returns a reference to the underlying HTTP client.
37    fn client(&self) -> &Self::Client {
38        &self.client
39    }
40
41    async fn download(&self, request: Request) -> Result<Response, SpiderError> {
42        info!(
43            "Downloading {} (fingerprint: {})",
44            request.url,
45            request.fingerprint()
46        );
47
48        let Request {
49            url,
50            method,
51            headers,
52            body,
53            meta,
54            ..
55        } = request;
56
57        // Get host-specific client if available, otherwise use default
58        let host = url.host_str().unwrap_or("").to_string();
59        // Convert DashMap to HashMap for the host client creation
60        let meta_hashmap: std::collections::HashMap<String, serde_json::Value> = 
61            meta.iter().map(|entry| (entry.key().clone().into_owned(), entry.value().clone())).collect();
62        let mut client_to_use = self.get_or_create_host_client(&host, &meta_hashmap).await;
63
64        if let Some(proxy_val) = meta.get("proxy")
65            && let Some(proxy_str) = proxy_val.as_str()
66        {
67            match Proxy::all(proxy_str) {
68                Ok(proxy) => {
69                    let new_client = Client::builder()
70                        .timeout(self.timeout)
71                        .proxy(proxy)
72                        .build()
73                        .map_err(|e| SpiderError::ReqwestError(e.into()))?;
74                    client_to_use = new_client;
75                }
76                Err(e) => {
77                    return Err(SpiderError::ReqwestError(e.into()));
78                }
79            }
80        }
81
82        let mut req_builder = client_to_use.request(method, url.clone());
83
84        if let Some(body_content) = body {
85            req_builder = match body_content {
86                Body::Json(json_val) => req_builder.json(&json_val),
87                Body::Form(form_val) => {
88                    let mut form_map = std::collections::HashMap::new();
89                    for entry in form_val.iter() {
90                        form_map.insert(entry.key().clone(), entry.value().clone());
91                    }
92                    req_builder.form(&form_map)
93                }
94                Body::Bytes(bytes_val) => req_builder.body(bytes_val),
95            };
96        }
97
98        let res = req_builder.headers(headers).send().await?;
99
100        let response_url = res.url().clone();
101        let status = res.status();
102        let response_headers = res.headers().clone();
103        let response_body = res.bytes().await?;
104
105        Ok(Response {
106            url: response_url,
107            status,
108            headers: response_headers,
109            body: response_body,
110            request_url: url,
111            meta,
112            cached: false,
113        })
114    }
115}
116
117impl ReqwestClientDownloader {
118    /// Creates a new `ReqwestClientDownloader` with a default timeout of 30 seconds.
119    pub fn new() -> Self {
120        Self::new_with_timeout(Duration::from_secs(30))
121    }
122
123    /// Creates a new `ReqwestClientDownloader` with a specified request timeout.
124    pub fn new_with_timeout(timeout: Duration) -> Self {
125        let base_client = Client::builder()
126            .timeout(timeout)
127            .pool_max_idle_per_host(200)
128            .pool_idle_timeout(Duration::from_secs(120))
129            .tcp_keepalive(Duration::from_secs(60))
130            .connect_timeout(Duration::from_secs(10))
131            .build()
132            .unwrap();
133            
134        ReqwestClientDownloader {
135            client: base_client.clone(),
136            timeout,
137            host_clients: Arc::new(RwLock::new(HashMap::new())),
138        }
139    }
140
141    /// Gets or creates a host-specific client with optimized settings for that host
142    async fn get_or_create_host_client(&self, host: &str, _meta: &std::collections::HashMap<String, serde_json::Value>) -> Client {
143        {
144            let clients = self.host_clients.read().await;
145            if let Some(client) = clients.get(host) {
146                return client.clone();
147            }
148        }
149
150        // Create a new client for this host with optimized settings
151        let host_specific_client = Client::builder()
152            .timeout(self.timeout)
153            .pool_max_idle_per_host(50) // Smaller pool per host to distribute connections
154            .pool_idle_timeout(Duration::from_secs(90))
155            .tcp_keepalive(Duration::from_secs(30))
156            .connect_timeout(Duration::from_secs(5))
157            .build()
158            .unwrap();
159
160        {
161            let mut clients = self.host_clients.write().await;
162            // Double-check pattern to avoid race condition
163            if let Some(client) = clients.get(host) {
164                return client.clone();
165            }
166            clients.insert(host.to_string(), host_specific_client.clone());
167        }
168
169        host_specific_client
170    }
171}
172
173impl Default for ReqwestClientDownloader {
174    fn default() -> Self {
175        Self::new()
176    }
177}