reqwest_proxy_pool/
pool.rs

1//! Core proxy pool implementation.
2
3use crate::config::{ProxyPoolConfig, ProxySelectionStrategy};
4use crate::error::NoProxyAvailable;
5use crate::proxy::{Proxy, ProxyStatus};
6use crate::utils;
7
8use futures::future;
9use log::{info, warn};
10use parking_lot::{Mutex, RwLock};
11use rand::Rng;
12use std::collections::HashSet;
13use std::sync::Arc;
14use std::time::Instant;
15use tokio::time::{self};
16
17/// A pool of proxies that can be used for HTTP requests.
18pub struct ProxyPool {
19    /// All proxies in the pool.
20    proxies: RwLock<Vec<Proxy>>,
21    /// Configuration for the pool.
22    pub config: ProxyPoolConfig,
23    /// Used for round-robin proxy selection.
24    last_proxy_index: Mutex<usize>,
25}
26
27impl ProxyPool {
28    /// Create a new proxy pool with the given configuration.
29    /// This will fetch proxies from sources and perform health checks synchronously.
30    pub async fn new(config: ProxyPoolConfig) -> Result<Arc<Self>, reqwest::Error> {
31        let pool = Arc::new(Self {
32            proxies: RwLock::new(Vec::new()),
33            config,
34            last_proxy_index: Mutex::new(0),
35        });
36        
37        // Initialize proxies from sources
38        pool.initialize_proxies().await?;
39        
40        // Perform initial health check synchronously
41        info!("Starting synchronous initial health check");
42        pool.check_all_proxies().await;
43        
44        // Display initial stats
45        let (total, healthy) = pool.get_stats();
46        info!("Initial proxy pool status: {}/{} healthy proxies", healthy, total);
47        
48        // Start background health check task
49        let pool_clone = Arc::clone(&pool);
50        tokio::spawn(async move {
51            loop {
52                time::sleep(pool_clone.config.health_check_interval).await;
53                pool_clone.check_all_proxies().await;
54                
55                let (total, healthy) = pool_clone.get_stats();
56                info!("Proxy pool status update: {}/{} healthy proxies", healthy, total);
57            }
58        });
59        
60        Ok(pool)
61    }
62    
63    /// Initialize the proxy pool by fetching proxies from all configured sources.
64    async fn initialize_proxies(&self) -> Result<(), reqwest::Error> {
65        info!("Initializing proxy pool from {} sources", self.config.sources.len());
66        
67        let mut all_proxies = HashSet::new();
68        
69        // Fetch proxies from each source
70        for source in &self.config.sources {
71            match utils::fetch_proxies_from_source(source).await {
72                Ok(source_proxies) => {
73                    info!("Fetched {} proxies from {}", source_proxies.len(), source);
74                    all_proxies.extend(source_proxies);
75                }
76                Err(e) => {
77                    warn!("Failed to fetch proxies from {}: {}", source, e);
78                }
79            }
80        }
81        
82        info!("Found {} unique proxies before health check", all_proxies.len());
83        
84        // Add proxies to the pool
85        {
86            let mut proxies = self.proxies.write();
87            for url in all_proxies {
88                proxies.push(Proxy::new(url, self.config.max_requests_per_second));
89            }
90        }
91        
92        Ok(())
93    }
94    
95    /// Check the health of all proxies in the pool.
96    pub async fn check_all_proxies(&self) {
97        info!("Starting health check for all proxies");
98        
99        let proxies = {
100            let guard = self.proxies.read();
101            guard.clone()
102        };
103        
104        let mut futures = Vec::new();
105        
106        for proxy in &proxies {
107            let proxy_url = proxy.url.clone();
108            let check_url = self.config.health_check_url.clone();
109            let timeout = self.config.health_check_timeout;
110            
111            let future = async move {
112                let start = Instant::now();
113                
114                // Create a client using this proxy
115                let proxy_client = match reqwest::Client::builder()
116                    .timeout(timeout)
117                    .proxy(reqwest::Proxy::all(&proxy_url).unwrap_or_else(|_| {
118                        // 正确指定返回类型为 Option<reqwest::Url>
119                        reqwest::Proxy::custom(move |_| -> Option<reqwest::Url> { None })
120                    }))
121                    .build() {
122                    Ok(client) => client,
123                    Err(_) => return (proxy_url, false, None),
124                };
125                
126                // Test the proxy
127                match proxy_client.get(&check_url).send().await {
128                    Ok(resp) if resp.status().is_success() => {
129                        let elapsed = start.elapsed().as_secs_f64();
130                        (proxy_url, true, Some(elapsed))
131                    }
132                    _ => (proxy_url, false, None),
133                }
134            };
135            
136            futures.push(future);
137        }
138        
139        // Run all health checks concurrently
140        let results = future::join_all(futures).await;
141        
142        let mut healthy_count = 0;
143        let mut unhealthy_count = 0;
144        
145        // Update proxy statuses based on health check results
146        {
147            let mut proxies = self.proxies.write();
148            
149            for (url, is_healthy, response_time) in results {
150                if let Some(proxy) = proxies.iter_mut().find(|p| p.url == url) {
151                    let old_status = proxy.status;
152                    
153                    if is_healthy {
154                        proxy.status = ProxyStatus::Healthy;
155                        proxy.response_time = response_time;
156                        healthy_count += 1;
157                    } else {
158                        proxy.status = ProxyStatus::Unhealthy;
159                        unhealthy_count += 1;
160                    }
161                    
162                    // Log status changes
163                    if old_status != proxy.status {
164                        info!("Proxy {} status changed: {:?} -> {:?}", 
165                            proxy.url, old_status, proxy.status);
166                    }
167                    
168                    proxy.last_check = Instant::now();
169                }
170            }
171        }
172        
173        info!("Health check completed: {} healthy, {} unhealthy", 
174            healthy_count, unhealthy_count);
175    }
176    
177    /// Get a proxy from the pool according to the configured selection strategy.
178    pub fn get_proxy(&self) -> Result<Proxy, NoProxyAvailable> {
179        let proxies = self.proxies.read();
180        
181        // Filter healthy proxies
182        let healthy_proxies: Vec<&Proxy> = proxies.iter()
183            .filter(|p| p.status == ProxyStatus::Healthy)
184            .collect();
185            
186        if healthy_proxies.is_empty() {
187            return Err(NoProxyAvailable);
188        }
189        
190        // Select a proxy based on the configured strategy
191        let selected = match self.config.selection_strategy {
192            ProxySelectionStrategy::FastestResponse => {
193                // Select the proxy with the fastest response time
194                healthy_proxies.iter()
195                    .min_by(|a, b| {
196                        a.response_time.unwrap_or(f64::MAX)
197                        .partial_cmp(&b.response_time.unwrap_or(f64::MAX))
198                        .unwrap_or(std::cmp::Ordering::Equal)
199                    })
200                    .unwrap()
201            },
202            ProxySelectionStrategy::MostReliable => {
203                // Select the proxy with the highest success rate
204                healthy_proxies.iter()
205                    .max_by(|a, b| {
206                        a.success_rate().partial_cmp(&b.success_rate())
207                        .unwrap_or(std::cmp::Ordering::Equal)
208                    })
209                    .unwrap()
210            },
211            ProxySelectionStrategy::Random => {
212                // Select a random healthy proxy
213                let mut rng = rand::rng();
214                let idx = rng.random_range(0..healthy_proxies.len());
215                &healthy_proxies[idx]
216            },
217            ProxySelectionStrategy::RoundRobin => {
218                // Round-robin selection
219                let mut last_index = self.last_proxy_index.lock();
220                *last_index = (*last_index + 1) % healthy_proxies.len();
221                &healthy_proxies[*last_index]
222            }
223        };
224            
225        Ok((*selected).clone())
226    }
227    
228    /// Report a successful request through a proxy.
229    pub fn report_proxy_success(&self, url: &str) {
230        let mut proxies = self.proxies.write();
231        if let Some(proxy) = proxies.iter_mut().find(|p| p.url == url) {
232            proxy.success_count += 1;
233            proxy.status = ProxyStatus::Healthy;
234        }
235    }
236    
237    /// Report a failed request through a proxy.
238    pub fn report_proxy_failure(&self, url: &str) {
239        let mut proxies = self.proxies.write();
240        if let Some(proxy) = proxies.iter_mut().find(|p| p.url == url) {
241            proxy.failure_count += 1;
242            
243            // Mark as unhealthy if failure ratio is too high
244            let failure_ratio = proxy.failure_count as f64 / 
245                (proxy.success_count + proxy.failure_count) as f64;
246                
247            if failure_ratio > 0.5 && proxy.failure_count >= 3 {
248                let old_status = proxy.status;
249                proxy.status = ProxyStatus::Unhealthy;
250                
251                if old_status != ProxyStatus::Unhealthy {
252                    warn!("Proxy {} marked unhealthy: {} failures, {} successes", 
253                        proxy.url, proxy.failure_count, proxy.success_count);
254                }
255            }
256        }
257    }
258    
259    /// Get statistics about the proxy pool.
260    pub fn get_stats(&self) -> (usize, usize) {
261        let proxies = self.proxies.read();
262        let total = proxies.len();
263        let healthy = proxies.iter()
264            .filter(|p| p.status == ProxyStatus::Healthy)
265            .count();
266            
267        (total, healthy)
268    }
269}