Skip to main content

reqwest_proxy_pool/
pool.rs

1//! Core proxy pool implementation.
2
3use crate::config::{HostConfig, ProxySelectionStrategy};
4use crate::error::NoProxyAvailable;
5use crate::proxy::{Proxy, ProxyStatus};
6use crate::utils;
7
8use futures::future;
9use log::{info, warn};
10use parking_lot::{Mutex, RwLock};
11use rand::RngExt;
12use std::collections::HashSet;
13use std::sync::Arc;
14use std::time::Instant;
15use tokio::time::{self};
16
17/// A pool of proxies that can be used for HTTP requests.
18pub struct ProxyPool {
19    /// Source URLs to fetch proxy lists from.
20    sources: Vec<String>,
21    /// All proxies in the pool.
22    proxies: RwLock<Vec<Proxy>>,
23    /// Host-level configuration for the pool.
24    pub config: HostConfig,
25    /// Used for round-robin proxy selection.
26    last_proxy_index: Mutex<usize>,
27}
28
29impl ProxyPool {
30    /// Create a new proxy pool with the given configuration.
31    /// This will fetch proxies from sources and perform health checks synchronously.
32    pub async fn new(
33        sources: Vec<String>,
34        config: HostConfig,
35    ) -> Result<Arc<Self>, reqwest::Error> {
36        let pool = Arc::new(Self {
37            sources,
38            proxies: RwLock::new(Vec::new()),
39            config,
40            last_proxy_index: Mutex::new(0),
41        });
42
43        // Initialize proxies from sources
44        pool.initialize_proxies().await?;
45
46        // Perform initial health check synchronously
47        info!("Starting synchronous initial health check");
48        pool.check_all_proxies().await;
49
50        // Display initial stats
51        let (total, healthy) = pool.get_stats();
52        info!(
53            "Initial proxy pool status: {}/{} healthy proxies",
54            healthy, total
55        );
56        if healthy < pool.config.min_available_proxies {
57            warn!(
58                "Healthy proxies below minimum for host [{}]: {} < {}",
59                pool.config.host(),
60                healthy,
61                pool.config.min_available_proxies
62            );
63        }
64
65        // Start background health check task
66        let pool_clone = Arc::clone(&pool);
67        tokio::spawn(async move {
68            loop {
69                time::sleep(pool_clone.config.health_check_interval).await;
70                pool_clone.check_all_proxies().await;
71
72                let (total, healthy) = pool_clone.get_stats();
73                info!(
74                    "Proxy pool status update: {}/{} healthy proxies",
75                    healthy, total
76                );
77                if healthy < pool_clone.config.min_available_proxies {
78                    warn!(
79                        "Healthy proxies below minimum for host [{}]: {} < {}",
80                        pool_clone.config.host(),
81                        healthy,
82                        pool_clone.config.min_available_proxies
83                    );
84                }
85            }
86        });
87
88        Ok(pool)
89    }
90
91    /// Initialize the proxy pool by fetching proxies from all configured sources.
92    async fn initialize_proxies(&self) -> Result<(), reqwest::Error> {
93        info!(
94            "Initializing proxy pool from {} sources",
95            self.sources.len()
96        );
97
98        let mut all_proxies = HashSet::new();
99
100        // Fetch proxies from each source
101        for source in &self.sources {
102            match utils::fetch_proxies_from_source(source).await {
103                Ok(source_proxies) => {
104                    info!("Fetched {} proxies from {}", source_proxies.len(), source);
105                    all_proxies.extend(source_proxies);
106                }
107                Err(e) => {
108                    warn!("Failed to fetch proxies from {}: {}", source, e);
109                }
110            }
111        }
112
113        info!(
114            "Found {} unique proxies before health check",
115            all_proxies.len()
116        );
117
118        // Add proxies to the pool
119        {
120            let mut proxies = self.proxies.write();
121            for url in all_proxies {
122                proxies.push(Proxy::new(url, self.config.min_request_interval_ms));
123            }
124        }
125
126        Ok(())
127    }
128
129    /// Check the health of all proxies in the pool.
130    pub async fn check_all_proxies(&self) {
131        info!("Starting health check for all proxies");
132
133        let proxies = {
134            let guard = self.proxies.read();
135            guard.clone()
136        };
137        let mut futures = Vec::new();
138
139        for proxy in &proxies {
140            let proxy_url = proxy.url.clone();
141            let check_url = self.config.health_check_url.clone();
142            let timeout = self.config.health_check_timeout;
143
144            let future = async move {
145                let start = Instant::now();
146
147                // Create a client using this proxy
148                let reqwest_proxy = match proxy.to_reqwest_proxy() {
149                    Ok(proxy) => proxy,
150                    Err(_) => return (proxy_url, false, None),
151                };
152
153                let proxy_client = match reqwest::Client::builder()
154                    .timeout(timeout)
155                    .danger_accept_invalid_certs(true)
156                    .proxy(reqwest_proxy)
157                    .build()
158                {
159                    Ok(client) => client,
160                    Err(_) => return (proxy_url, false, None),
161                };
162
163                // Test the proxy.
164                match proxy_client.get(&check_url).send().await {
165                    Ok(resp) if resp.status().is_success() => {
166                        let elapsed = start.elapsed().as_secs_f64();
167                        (proxy_url, true, Some(elapsed))
168                    }
169                    _ => (proxy_url, false, None),
170                }
171            };
172
173            futures.push(future);
174        }
175
176        // Run all health checks concurrently
177        let results = future::join_all(futures).await;
178
179        let mut healthy_count = 0;
180        let mut unhealthy_count = 0;
181
182        // Update proxy statuses based on health check results
183        {
184            let mut proxies = self.proxies.write();
185
186            for (url, is_healthy, response_time) in results {
187                if let Some(proxy) = proxies.iter_mut().find(|p| p.url == url) {
188                    let old_status = proxy.status;
189
190                    if is_healthy {
191                        proxy.status = ProxyStatus::Healthy;
192                        proxy.response_time = response_time;
193                        proxy.cooldown_until = None;
194                        healthy_count += 1;
195                    } else {
196                        proxy.status = ProxyStatus::Unhealthy;
197                        proxy.cooldown_until = Some(Instant::now() + self.config.proxy_cooldown);
198                        unhealthy_count += 1;
199                    }
200
201                    // Log status changes
202                    if old_status != proxy.status {
203                        info!(
204                            "Proxy {} status changed: {:?} -> {:?}",
205                            proxy.url, old_status, proxy.status
206                        );
207                    }
208
209                    proxy.last_check = Instant::now();
210                }
211            }
212        }
213
214        info!(
215            "Health check completed: {} healthy, {} unhealthy",
216            healthy_count, unhealthy_count
217        );
218    }
219
220    /// Get a proxy from the pool according to the configured selection strategy.
221    pub fn get_proxy(&self) -> Result<Proxy, NoProxyAvailable> {
222        self.get_proxy_internal(None)
223    }
224
225    /// Get a proxy while excluding specific proxy URLs.
226    pub fn get_proxy_excluding(
227        &self,
228        excluded: &HashSet<String>,
229    ) -> Result<Proxy, NoProxyAvailable> {
230        self.get_proxy_internal(Some(excluded))
231    }
232
233    fn get_proxy_internal(
234        &self,
235        excluded: Option<&HashSet<String>>,
236    ) -> Result<Proxy, NoProxyAvailable> {
237        let now = Instant::now();
238        let mut proxies = self.proxies.write();
239
240        // Move cooled-down proxies into half-open so they can be probed by real traffic.
241        for proxy in proxies.iter_mut() {
242            if proxy.status == ProxyStatus::Unhealthy
243                && proxy
244                    .cooldown_until
245                    .is_some_and(|cooldown_until| cooldown_until <= now)
246            {
247                proxy.status = ProxyStatus::HalfOpen;
248                proxy.cooldown_until = None;
249            }
250        }
251
252        // Filter selectable proxies.
253        let healthy_proxies: Vec<&Proxy> = proxies
254            .iter()
255            .filter(|p| matches!(p.status, ProxyStatus::Healthy | ProxyStatus::HalfOpen))
256            .filter(|p| excluded.map(|urls| !urls.contains(&p.url)).unwrap_or(true))
257            .collect();
258
259        if healthy_proxies.is_empty() {
260            return Err(NoProxyAvailable);
261        }
262
263        // Select a proxy based on the configured strategy
264        let selected = match self.config.selection_strategy {
265            ProxySelectionStrategy::FastestResponse => {
266                // Select the proxy with the fastest response time
267                healthy_proxies
268                    .iter()
269                    .min_by(|a, b| {
270                        a.response_time
271                            .unwrap_or(f64::MAX)
272                            .partial_cmp(&b.response_time.unwrap_or(f64::MAX))
273                            .unwrap_or(std::cmp::Ordering::Equal)
274                    })
275                    .unwrap()
276            }
277            ProxySelectionStrategy::MostReliable => {
278                // Select the proxy with the highest success rate
279                healthy_proxies
280                    .iter()
281                    .max_by(|a, b| {
282                        a.success_rate()
283                            .partial_cmp(&b.success_rate())
284                            .unwrap_or(std::cmp::Ordering::Equal)
285                    })
286                    .unwrap()
287            }
288            ProxySelectionStrategy::TopKReliableRandom => {
289                let mut ranked = healthy_proxies;
290                ranked.sort_by(|a, b| {
291                    b.success_rate()
292                        .partial_cmp(&a.success_rate())
293                        .unwrap_or(std::cmp::Ordering::Equal)
294                });
295                let top_k = self.config.reliable_top_k.min(ranked.len()).max(1);
296                let mut rng = rand::rng();
297                let idx = rng.random_range(0..top_k);
298                ranked[idx]
299            }
300            ProxySelectionStrategy::Random => {
301                // Select a random healthy proxy
302                let mut rng = rand::rng();
303                let idx = rng.random_range(0..healthy_proxies.len());
304                healthy_proxies[idx]
305            }
306            ProxySelectionStrategy::RoundRobin => {
307                // Round-robin selection
308                let mut last_index = self.last_proxy_index.lock();
309                *last_index = (*last_index + 1) % healthy_proxies.len();
310                healthy_proxies[*last_index]
311            }
312        };
313
314        Ok((*selected).clone())
315    }
316
317    /// Report a successful request through a proxy.
318    pub fn report_proxy_success(&self, url: &str) {
319        let mut proxies = self.proxies.write();
320        if let Some(proxy) = proxies.iter_mut().find(|p| p.url == url) {
321            proxy.success_count += 1;
322            proxy.status = ProxyStatus::Healthy;
323            proxy.cooldown_until = None;
324        }
325    }
326
327    /// Report a failed request through a proxy.
328    pub fn report_proxy_failure(&self, url: &str) {
329        let mut proxies = self.proxies.write();
330        if let Some(proxy) = proxies.iter_mut().find(|p| p.url == url) {
331            proxy.failure_count += 1;
332            let old_status = proxy.status;
333            proxy.status = ProxyStatus::Unhealthy;
334            proxy.cooldown_until = Some(Instant::now() + self.config.proxy_cooldown);
335
336            if old_status != ProxyStatus::Unhealthy {
337                warn!(
338                    "Proxy {} marked unhealthy: {} failures, {} successes, cooldown {:?}",
339                    proxy.url, proxy.failure_count, proxy.success_count, self.config.proxy_cooldown
340                );
341            }
342        }
343    }
344
345    /// Get statistics about the proxy pool.
346    pub fn get_stats(&self) -> (usize, usize) {
347        let proxies = self.proxies.read();
348        let total = proxies.len();
349        let healthy = proxies
350            .iter()
351            .filter(|p| matches!(p.status, ProxyStatus::Healthy | ProxyStatus::HalfOpen))
352            .count();
353
354        (total, healthy)
355    }
356}