Skip to main content

reqwest_proxy_pool/
pool.rs

1//! Core proxy pool implementation.
2
3use crate::config::{HostConfig, ProxySelectionStrategy};
4use crate::error::NoProxyAvailable;
5use crate::proxy::{Proxy, ProxyStatus};
6use crate::utils;
7
8use futures::future;
9use log::{info, warn};
10use parking_lot::{Mutex, RwLock};
11use rand::RngExt;
12use std::collections::HashSet;
13use std::sync::Arc;
14use std::time::Instant;
15use tokio::time::{self};
16
17/// A pool of proxies that can be used for HTTP requests.
18pub struct ProxyPool {
19    /// Source URLs to fetch proxy lists from.
20    sources: Vec<String>,
21    /// All proxies in the pool.
22    proxies: RwLock<Vec<Proxy>>,
23    /// Host-level configuration for the pool.
24    pub config: HostConfig,
25    /// Used for round-robin proxy selection.
26    last_proxy_index: Mutex<usize>,
27}
28
29impl ProxyPool {
30    /// Create a new proxy pool with the given configuration.
31    /// This will fetch proxies from sources and perform health checks synchronously.
32    pub async fn new(
33        sources: Vec<String>,
34        config: HostConfig,
35    ) -> Result<Arc<Self>, reqwest::Error> {
36        let pool = Arc::new(Self {
37            sources,
38            proxies: RwLock::new(Vec::new()),
39            config,
40            last_proxy_index: Mutex::new(0),
41        });
42
43        // Initialize proxies from sources
44        pool.initialize_proxies().await?;
45
46        // Perform initial health check synchronously
47        info!("Starting synchronous initial health check");
48        pool.check_all_proxies().await;
49
50        // Display initial stats
51        let (total, healthy) = pool.get_stats();
52        info!(
53            "Initial proxy pool status: {}/{} healthy proxies",
54            healthy, total
55        );
56        if healthy < pool.config.min_available_proxies {
57            warn!(
58                "Healthy proxies below minimum for host [{}]: {} < {}",
59                pool.config.host(),
60                healthy,
61                pool.config.min_available_proxies
62            );
63        }
64
65        // Start background health check task
66        let pool_clone = Arc::clone(&pool);
67        tokio::spawn(async move {
68            loop {
69                time::sleep(pool_clone.config.health_check_interval).await;
70                pool_clone.check_all_proxies().await;
71
72                let (total, healthy) = pool_clone.get_stats();
73                info!(
74                    "Proxy pool status update: {}/{} healthy proxies",
75                    healthy, total
76                );
77                if healthy < pool_clone.config.min_available_proxies {
78                    warn!(
79                        "Healthy proxies below minimum for host [{}]: {} < {}",
80                        pool_clone.config.host(),
81                        healthy,
82                        pool_clone.config.min_available_proxies
83                    );
84                }
85            }
86        });
87
88        Ok(pool)
89    }
90
91    /// Initialize the proxy pool by fetching proxies from all configured sources.
92    async fn initialize_proxies(&self) -> Result<(), reqwest::Error> {
93        info!(
94            "Initializing proxy pool from {} sources",
95            self.sources.len()
96        );
97
98        let mut all_proxies = HashSet::new();
99
100        // Fetch proxies from each source
101        for source in &self.sources {
102            match utils::fetch_proxies_from_source(source).await {
103                Ok(source_proxies) => {
104                    info!("Fetched {} proxies from {}", source_proxies.len(), source);
105                    all_proxies.extend(source_proxies);
106                }
107                Err(e) => {
108                    warn!("Failed to fetch proxies from {}: {}", source, e);
109                }
110            }
111        }
112
113        info!(
114            "Found {} unique proxies before health check",
115            all_proxies.len()
116        );
117
118        // Add proxies to the pool
119        {
120            let mut proxies = self.proxies.write();
121            for url in all_proxies {
122                proxies.push(Proxy::new(url, self.config.min_request_interval_ms));
123            }
124        }
125
126        Ok(())
127    }
128
129    /// Check the health of all proxies in the pool.
130    pub async fn check_all_proxies(&self) {
131        info!("Starting health check for all proxies");
132
133        let proxies = {
134            let guard = self.proxies.read();
135            guard.clone()
136        };
137        let mut futures = Vec::new();
138
139        for proxy in &proxies {
140            let proxy_url = proxy.url.clone();
141            let check_url = self.config.health_check_url.clone();
142            let timeout = self.config.health_check_timeout;
143            let accept_invalid_certs = self.config.danger_accept_invalid_certs;
144
145            let future = async move {
146                let start = Instant::now();
147
148                // Create a client using this proxy
149                let reqwest_proxy = match proxy.to_reqwest_proxy() {
150                    Ok(proxy) => proxy,
151                    Err(_) => return (proxy_url, false, None),
152                };
153
154                let proxy_client = match reqwest::Client::builder()
155                    .timeout(timeout)
156                    .danger_accept_invalid_certs(accept_invalid_certs)
157                    .proxy(reqwest_proxy)
158                    .build()
159                {
160                    Ok(client) => client,
161                    Err(_) => return (proxy_url, false, None),
162                };
163
164                // Test the proxy.
165                match proxy_client.get(&check_url).send().await {
166                    Ok(resp) if resp.status().is_success() => {
167                        let elapsed = start.elapsed().as_secs_f64();
168                        (proxy_url, true, Some(elapsed))
169                    }
170                    _ => (proxy_url, false, None),
171                }
172            };
173
174            futures.push(future);
175        }
176
177        // Run all health checks concurrently
178        let results = future::join_all(futures).await;
179
180        let mut healthy_count = 0;
181        let mut unhealthy_count = 0;
182
183        // Update proxy statuses based on health check results
184        {
185            let mut proxies = self.proxies.write();
186
187            for (url, is_healthy, response_time) in results {
188                if let Some(proxy) = proxies.iter_mut().find(|p| p.url == url) {
189                    let old_status = proxy.status;
190
191                    if is_healthy {
192                        proxy.status = ProxyStatus::Healthy;
193                        proxy.response_time = response_time;
194                        healthy_count += 1;
195                    } else {
196                        proxy.status = ProxyStatus::Unhealthy;
197                        unhealthy_count += 1;
198                    }
199
200                    // Log status changes
201                    if old_status != proxy.status {
202                        info!(
203                            "Proxy {} status changed: {:?} -> {:?}",
204                            proxy.url, old_status, proxy.status
205                        );
206                    }
207
208                    proxy.last_check = Instant::now();
209                }
210            }
211        }
212
213        info!(
214            "Health check completed: {} healthy, {} unhealthy",
215            healthy_count, unhealthy_count
216        );
217    }
218
219    /// Get a proxy from the pool according to the configured selection strategy.
220    pub fn get_proxy(&self) -> Result<Proxy, NoProxyAvailable> {
221        self.get_proxy_internal(None)
222    }
223
224    /// Get a proxy while excluding specific proxy URLs.
225    pub fn get_proxy_excluding(
226        &self,
227        excluded: &HashSet<String>,
228    ) -> Result<Proxy, NoProxyAvailable> {
229        self.get_proxy_internal(Some(excluded))
230    }
231
232    fn get_proxy_internal(
233        &self,
234        excluded: Option<&HashSet<String>>,
235    ) -> Result<Proxy, NoProxyAvailable> {
236        let proxies = self.proxies.read();
237
238        // Filter healthy proxies
239        let healthy_proxies: Vec<&Proxy> = proxies
240            .iter()
241            .filter(|p| p.status == ProxyStatus::Healthy)
242            .filter(|p| excluded.map(|urls| !urls.contains(&p.url)).unwrap_or(true))
243            .collect();
244
245        if healthy_proxies.is_empty() {
246            return Err(NoProxyAvailable);
247        }
248
249        // Select a proxy based on the configured strategy
250        let selected = match self.config.selection_strategy {
251            ProxySelectionStrategy::FastestResponse => {
252                // Select the proxy with the fastest response time
253                healthy_proxies
254                    .iter()
255                    .min_by(|a, b| {
256                        a.response_time
257                            .unwrap_or(f64::MAX)
258                            .partial_cmp(&b.response_time.unwrap_or(f64::MAX))
259                            .unwrap_or(std::cmp::Ordering::Equal)
260                    })
261                    .unwrap()
262            }
263            ProxySelectionStrategy::MostReliable => {
264                // Select the proxy with the highest success rate
265                healthy_proxies
266                    .iter()
267                    .max_by(|a, b| {
268                        a.success_rate()
269                            .partial_cmp(&b.success_rate())
270                            .unwrap_or(std::cmp::Ordering::Equal)
271                    })
272                    .unwrap()
273            }
274            ProxySelectionStrategy::Random => {
275                // Select a random healthy proxy
276                let mut rng = rand::rng();
277                let idx = rng.random_range(0..healthy_proxies.len());
278                &healthy_proxies[idx]
279            }
280            ProxySelectionStrategy::RoundRobin => {
281                // Round-robin selection
282                let mut last_index = self.last_proxy_index.lock();
283                *last_index = (*last_index + 1) % healthy_proxies.len();
284                &healthy_proxies[*last_index]
285            }
286        };
287
288        Ok((*selected).clone())
289    }
290
291    /// Report a successful request through a proxy.
292    pub fn report_proxy_success(&self, url: &str) {
293        let mut proxies = self.proxies.write();
294        if let Some(proxy) = proxies.iter_mut().find(|p| p.url == url) {
295            proxy.success_count += 1;
296            proxy.status = ProxyStatus::Healthy;
297        }
298    }
299
300    /// Report a failed request through a proxy.
301    pub fn report_proxy_failure(&self, url: &str) {
302        let mut proxies = self.proxies.write();
303        if let Some(proxy) = proxies.iter_mut().find(|p| p.url == url) {
304            proxy.failure_count += 1;
305
306            // Mark as unhealthy if failure ratio is too high
307            let failure_ratio =
308                proxy.failure_count as f64 / (proxy.success_count + proxy.failure_count) as f64;
309
310            if failure_ratio > 0.5 && proxy.failure_count >= 3 {
311                let old_status = proxy.status;
312                proxy.status = ProxyStatus::Unhealthy;
313
314                if old_status != ProxyStatus::Unhealthy {
315                    warn!(
316                        "Proxy {} marked unhealthy: {} failures, {} successes",
317                        proxy.url, proxy.failure_count, proxy.success_count
318                    );
319                }
320            }
321        }
322    }
323
324    /// Get statistics about the proxy pool.
325    pub fn get_stats(&self) -> (usize, usize) {
326        let proxies = self.proxies.read();
327        let total = proxies.len();
328        let healthy = proxies
329            .iter()
330            .filter(|p| p.status == ProxyStatus::Healthy)
331            .count();
332
333        (total, healthy)
334    }
335}