reqwest_proxy_pool/
pool.rs1use crate::config::{ProxyPoolConfig, ProxySelectionStrategy};
4use crate::error::NoProxyAvailable;
5use crate::proxy::{Proxy, ProxyStatus};
6use crate::utils;
7
8use futures::future;
9use log::{info, warn};
10use parking_lot::{Mutex, RwLock};
11use rand::RngExt;
12use std::collections::HashSet;
13use std::sync::Arc;
14use std::time::Instant;
15use tokio::time::{self};
16
17pub struct ProxyPool {
19 proxies: RwLock<Vec<Proxy>>,
21 pub config: ProxyPoolConfig,
23 last_proxy_index: Mutex<usize>,
25}
26
27impl ProxyPool {
28 pub async fn new(config: ProxyPoolConfig) -> Result<Arc<Self>, reqwest::Error> {
31 let pool = Arc::new(Self {
32 proxies: RwLock::new(Vec::new()),
33 config,
34 last_proxy_index: Mutex::new(0),
35 });
36
37 pool.initialize_proxies().await?;
39
40 info!("Starting synchronous initial health check");
42 pool.check_all_proxies().await;
43
44 let (total, healthy) = pool.get_stats();
46 info!(
47 "Initial proxy pool status: {}/{} healthy proxies",
48 healthy, total
49 );
50
51 let pool_clone = Arc::clone(&pool);
53 tokio::spawn(async move {
54 loop {
55 time::sleep(pool_clone.config.health_check_interval).await;
56 pool_clone.check_all_proxies().await;
57
58 let (total, healthy) = pool_clone.get_stats();
59 info!(
60 "Proxy pool status update: {}/{} healthy proxies",
61 healthy, total
62 );
63 }
64 });
65
66 Ok(pool)
67 }
68
69 async fn initialize_proxies(&self) -> Result<(), reqwest::Error> {
71 info!(
72 "Initializing proxy pool from {} sources",
73 self.config.sources.len()
74 );
75
76 let mut all_proxies = HashSet::new();
77
78 for source in &self.config.sources {
80 match utils::fetch_proxies_from_source(source).await {
81 Ok(source_proxies) => {
82 info!("Fetched {} proxies from {}", source_proxies.len(), source);
83 all_proxies.extend(source_proxies);
84 }
85 Err(e) => {
86 warn!("Failed to fetch proxies from {}: {}", source, e);
87 }
88 }
89 }
90
91 info!(
92 "Found {} unique proxies before health check",
93 all_proxies.len()
94 );
95
96 {
98 let mut proxies = self.proxies.write();
99 for url in all_proxies {
100 proxies.push(Proxy::new(url, self.config.max_requests_per_second));
101 }
102 }
103
104 Ok(())
105 }
106
107 pub async fn check_all_proxies(&self) {
109 info!("Starting health check for all proxies");
110
111 let proxies = {
112 let guard = self.proxies.read();
113 guard.clone()
114 };
115
116 let mut futures = Vec::new();
117
118 for proxy in &proxies {
119 let proxy_url = proxy.url.clone();
120 let check_url = self.config.health_check_url.clone();
121 let timeout = self.config.health_check_timeout;
122 let accept_invalid_certs = self.config.danger_accept_invalid_certs;
123
124 let future = async move {
125 let start = Instant::now();
126
127 let reqwest_proxy = match proxy.to_reqwest_proxy() {
129 Ok(proxy) => proxy,
130 Err(_) => return (proxy_url, false, None),
131 };
132
133 let proxy_client = match reqwest::Client::builder()
134 .timeout(timeout)
135 .danger_accept_invalid_certs(accept_invalid_certs)
136 .proxy(reqwest_proxy)
137 .build()
138 {
139 Ok(client) => client,
140 Err(_) => return (proxy_url, false, None),
141 };
142
143 match proxy_client.get(&check_url).send().await {
145 Ok(resp) if resp.status().is_success() => {
146 let elapsed = start.elapsed().as_secs_f64();
147 (proxy_url, true, Some(elapsed))
148 }
149 _ => (proxy_url, false, None),
150 }
151 };
152
153 futures.push(future);
154 }
155
156 let results = future::join_all(futures).await;
158
159 let mut healthy_count = 0;
160 let mut unhealthy_count = 0;
161
162 {
164 let mut proxies = self.proxies.write();
165
166 for (url, is_healthy, response_time) in results {
167 if let Some(proxy) = proxies.iter_mut().find(|p| p.url == url) {
168 let old_status = proxy.status;
169
170 if is_healthy {
171 proxy.status = ProxyStatus::Healthy;
172 proxy.response_time = response_time;
173 healthy_count += 1;
174 } else {
175 proxy.status = ProxyStatus::Unhealthy;
176 unhealthy_count += 1;
177 }
178
179 if old_status != proxy.status {
181 info!(
182 "Proxy {} status changed: {:?} -> {:?}",
183 proxy.url, old_status, proxy.status
184 );
185 }
186
187 proxy.last_check = Instant::now();
188 }
189 }
190 }
191
192 info!(
193 "Health check completed: {} healthy, {} unhealthy",
194 healthy_count, unhealthy_count
195 );
196 }
197
198 pub fn get_proxy(&self) -> Result<Proxy, NoProxyAvailable> {
200 let proxies = self.proxies.read();
201
202 let healthy_proxies: Vec<&Proxy> = proxies
204 .iter()
205 .filter(|p| p.status == ProxyStatus::Healthy)
206 .collect();
207
208 if healthy_proxies.is_empty() {
209 return Err(NoProxyAvailable);
210 }
211
212 let selected = match self.config.selection_strategy {
214 ProxySelectionStrategy::FastestResponse => {
215 healthy_proxies
217 .iter()
218 .min_by(|a, b| {
219 a.response_time
220 .unwrap_or(f64::MAX)
221 .partial_cmp(&b.response_time.unwrap_or(f64::MAX))
222 .unwrap_or(std::cmp::Ordering::Equal)
223 })
224 .unwrap()
225 }
226 ProxySelectionStrategy::MostReliable => {
227 healthy_proxies
229 .iter()
230 .max_by(|a, b| {
231 a.success_rate()
232 .partial_cmp(&b.success_rate())
233 .unwrap_or(std::cmp::Ordering::Equal)
234 })
235 .unwrap()
236 }
237 ProxySelectionStrategy::Random => {
238 let mut rng = rand::rng();
240 let idx = rng.random_range(0..healthy_proxies.len());
241 &healthy_proxies[idx]
242 }
243 ProxySelectionStrategy::RoundRobin => {
244 let mut last_index = self.last_proxy_index.lock();
246 *last_index = (*last_index + 1) % healthy_proxies.len();
247 &healthy_proxies[*last_index]
248 }
249 };
250
251 Ok((*selected).clone())
252 }
253
254 pub fn report_proxy_success(&self, url: &str) {
256 let mut proxies = self.proxies.write();
257 if let Some(proxy) = proxies.iter_mut().find(|p| p.url == url) {
258 proxy.success_count += 1;
259 proxy.status = ProxyStatus::Healthy;
260 }
261 }
262
263 pub fn report_proxy_failure(&self, url: &str) {
265 let mut proxies = self.proxies.write();
266 if let Some(proxy) = proxies.iter_mut().find(|p| p.url == url) {
267 proxy.failure_count += 1;
268
269 let failure_ratio =
271 proxy.failure_count as f64 / (proxy.success_count + proxy.failure_count) as f64;
272
273 if failure_ratio > 0.5 && proxy.failure_count >= 3 {
274 let old_status = proxy.status;
275 proxy.status = ProxyStatus::Unhealthy;
276
277 if old_status != ProxyStatus::Unhealthy {
278 warn!(
279 "Proxy {} marked unhealthy: {} failures, {} successes",
280 proxy.url, proxy.failure_count, proxy.success_count
281 );
282 }
283 }
284 }
285 }
286
287 pub fn get_stats(&self) -> (usize, usize) {
289 let proxies = self.proxies.read();
290 let total = proxies.len();
291 let healthy = proxies
292 .iter()
293 .filter(|p| p.status == ProxyStatus::Healthy)
294 .count();
295
296 (total, healthy)
297 }
298}