Skip to main content

reqwest_proxy_pool/
pool.rs

1//! Core proxy pool implementation.
2
3use crate::config::{ProxyPoolConfig, ProxySelectionStrategy};
4use crate::error::NoProxyAvailable;
5use crate::proxy::{Proxy, ProxyStatus};
6use crate::utils;
7
8use futures::future;
9use log::{info, warn};
10use parking_lot::{Mutex, RwLock};
11use rand::RngExt;
12use std::collections::HashSet;
13use std::sync::Arc;
14use std::time::Instant;
15use tokio::time::{self};
16
17/// A pool of proxies that can be used for HTTP requests.
18pub struct ProxyPool {
19    /// All proxies in the pool.
20    proxies: RwLock<Vec<Proxy>>,
21    /// Configuration for the pool.
22    pub config: ProxyPoolConfig,
23    /// Used for round-robin proxy selection.
24    last_proxy_index: Mutex<usize>,
25}
26
27impl ProxyPool {
28    /// Create a new proxy pool with the given configuration.
29    /// This will fetch proxies from sources and perform health checks synchronously.
30    pub async fn new(config: ProxyPoolConfig) -> Result<Arc<Self>, reqwest::Error> {
31        let pool = Arc::new(Self {
32            proxies: RwLock::new(Vec::new()),
33            config,
34            last_proxy_index: Mutex::new(0),
35        });
36
37        // Initialize proxies from sources
38        pool.initialize_proxies().await?;
39
40        // Perform initial health check synchronously
41        info!("Starting synchronous initial health check");
42        pool.check_all_proxies().await;
43
44        // Display initial stats
45        let (total, healthy) = pool.get_stats();
46        info!(
47            "Initial proxy pool status: {}/{} healthy proxies",
48            healthy, total
49        );
50
51        // Start background health check task
52        let pool_clone = Arc::clone(&pool);
53        tokio::spawn(async move {
54            loop {
55                time::sleep(pool_clone.config.health_check_interval).await;
56                pool_clone.check_all_proxies().await;
57
58                let (total, healthy) = pool_clone.get_stats();
59                info!(
60                    "Proxy pool status update: {}/{} healthy proxies",
61                    healthy, total
62                );
63            }
64        });
65
66        Ok(pool)
67    }
68
69    /// Initialize the proxy pool by fetching proxies from all configured sources.
70    async fn initialize_proxies(&self) -> Result<(), reqwest::Error> {
71        info!(
72            "Initializing proxy pool from {} sources",
73            self.config.sources.len()
74        );
75
76        let mut all_proxies = HashSet::new();
77
78        // Fetch proxies from each source
79        for source in &self.config.sources {
80            match utils::fetch_proxies_from_source(source).await {
81                Ok(source_proxies) => {
82                    info!("Fetched {} proxies from {}", source_proxies.len(), source);
83                    all_proxies.extend(source_proxies);
84                }
85                Err(e) => {
86                    warn!("Failed to fetch proxies from {}: {}", source, e);
87                }
88            }
89        }
90
91        info!(
92            "Found {} unique proxies before health check",
93            all_proxies.len()
94        );
95
96        // Add proxies to the pool
97        {
98            let mut proxies = self.proxies.write();
99            for url in all_proxies {
100                proxies.push(Proxy::new(url, self.config.max_requests_per_second));
101            }
102        }
103
104        Ok(())
105    }
106
107    /// Check the health of all proxies in the pool.
108    pub async fn check_all_proxies(&self) {
109        info!("Starting health check for all proxies");
110
111        let proxies = {
112            let guard = self.proxies.read();
113            guard.clone()
114        };
115
116        let mut futures = Vec::new();
117
118        for proxy in &proxies {
119            let proxy_url = proxy.url.clone();
120            let check_url = self.config.health_check_url.clone();
121            let timeout = self.config.health_check_timeout;
122            let accept_invalid_certs = self.config.danger_accept_invalid_certs;
123
124            let future = async move {
125                let start = Instant::now();
126
127                // Create a client using this proxy
128                let reqwest_proxy = match proxy.to_reqwest_proxy() {
129                    Ok(proxy) => proxy,
130                    Err(_) => return (proxy_url, false, None),
131                };
132
133                let proxy_client = match reqwest::Client::builder()
134                    .timeout(timeout)
135                    .danger_accept_invalid_certs(accept_invalid_certs)
136                    .proxy(reqwest_proxy)
137                    .build()
138                {
139                    Ok(client) => client,
140                    Err(_) => return (proxy_url, false, None),
141                };
142
143                // Test the proxy
144                match proxy_client.get(&check_url).send().await {
145                    Ok(resp) if resp.status().is_success() => {
146                        let elapsed = start.elapsed().as_secs_f64();
147                        (proxy_url, true, Some(elapsed))
148                    }
149                    _ => (proxy_url, false, None),
150                }
151            };
152
153            futures.push(future);
154        }
155
156        // Run all health checks concurrently
157        let results = future::join_all(futures).await;
158
159        let mut healthy_count = 0;
160        let mut unhealthy_count = 0;
161
162        // Update proxy statuses based on health check results
163        {
164            let mut proxies = self.proxies.write();
165
166            for (url, is_healthy, response_time) in results {
167                if let Some(proxy) = proxies.iter_mut().find(|p| p.url == url) {
168                    let old_status = proxy.status;
169
170                    if is_healthy {
171                        proxy.status = ProxyStatus::Healthy;
172                        proxy.response_time = response_time;
173                        healthy_count += 1;
174                    } else {
175                        proxy.status = ProxyStatus::Unhealthy;
176                        unhealthy_count += 1;
177                    }
178
179                    // Log status changes
180                    if old_status != proxy.status {
181                        info!(
182                            "Proxy {} status changed: {:?} -> {:?}",
183                            proxy.url, old_status, proxy.status
184                        );
185                    }
186
187                    proxy.last_check = Instant::now();
188                }
189            }
190        }
191
192        info!(
193            "Health check completed: {} healthy, {} unhealthy",
194            healthy_count, unhealthy_count
195        );
196    }
197
198    /// Get a proxy from the pool according to the configured selection strategy.
199    pub fn get_proxy(&self) -> Result<Proxy, NoProxyAvailable> {
200        let proxies = self.proxies.read();
201
202        // Filter healthy proxies
203        let healthy_proxies: Vec<&Proxy> = proxies
204            .iter()
205            .filter(|p| p.status == ProxyStatus::Healthy)
206            .collect();
207
208        if healthy_proxies.is_empty() {
209            return Err(NoProxyAvailable);
210        }
211
212        // Select a proxy based on the configured strategy
213        let selected = match self.config.selection_strategy {
214            ProxySelectionStrategy::FastestResponse => {
215                // Select the proxy with the fastest response time
216                healthy_proxies
217                    .iter()
218                    .min_by(|a, b| {
219                        a.response_time
220                            .unwrap_or(f64::MAX)
221                            .partial_cmp(&b.response_time.unwrap_or(f64::MAX))
222                            .unwrap_or(std::cmp::Ordering::Equal)
223                    })
224                    .unwrap()
225            }
226            ProxySelectionStrategy::MostReliable => {
227                // Select the proxy with the highest success rate
228                healthy_proxies
229                    .iter()
230                    .max_by(|a, b| {
231                        a.success_rate()
232                            .partial_cmp(&b.success_rate())
233                            .unwrap_or(std::cmp::Ordering::Equal)
234                    })
235                    .unwrap()
236            }
237            ProxySelectionStrategy::Random => {
238                // Select a random healthy proxy
239                let mut rng = rand::rng();
240                let idx = rng.random_range(0..healthy_proxies.len());
241                &healthy_proxies[idx]
242            }
243            ProxySelectionStrategy::RoundRobin => {
244                // Round-robin selection
245                let mut last_index = self.last_proxy_index.lock();
246                *last_index = (*last_index + 1) % healthy_proxies.len();
247                &healthy_proxies[*last_index]
248            }
249        };
250
251        Ok((*selected).clone())
252    }
253
254    /// Report a successful request through a proxy.
255    pub fn report_proxy_success(&self, url: &str) {
256        let mut proxies = self.proxies.write();
257        if let Some(proxy) = proxies.iter_mut().find(|p| p.url == url) {
258            proxy.success_count += 1;
259            proxy.status = ProxyStatus::Healthy;
260        }
261    }
262
263    /// Report a failed request through a proxy.
264    pub fn report_proxy_failure(&self, url: &str) {
265        let mut proxies = self.proxies.write();
266        if let Some(proxy) = proxies.iter_mut().find(|p| p.url == url) {
267            proxy.failure_count += 1;
268
269            // Mark as unhealthy if failure ratio is too high
270            let failure_ratio =
271                proxy.failure_count as f64 / (proxy.success_count + proxy.failure_count) as f64;
272
273            if failure_ratio > 0.5 && proxy.failure_count >= 3 {
274                let old_status = proxy.status;
275                proxy.status = ProxyStatus::Unhealthy;
276
277                if old_status != ProxyStatus::Unhealthy {
278                    warn!(
279                        "Proxy {} marked unhealthy: {} failures, {} successes",
280                        proxy.url, proxy.failure_count, proxy.success_count
281                    );
282                }
283            }
284        }
285    }
286
287    /// Get statistics about the proxy pool.
288    pub fn get_stats(&self) -> (usize, usize) {
289        let proxies = self.proxies.read();
290        let total = proxies.len();
291        let healthy = proxies
292            .iter()
293            .filter(|p| p.status == ProxyStatus::Healthy)
294            .count();
295
296        (total, healthy)
297    }
298}