reqwest_proxy_pool/
pool.rs1use crate::config::{HostConfig, ProxySelectionStrategy};
4use crate::error::NoProxyAvailable;
5use crate::proxy::{Proxy, ProxyStatus};
6use crate::utils;
7
8use futures::future;
9use log::{info, warn};
10use parking_lot::{Mutex, RwLock};
11use rand::RngExt;
12use std::collections::HashSet;
13use std::sync::Arc;
14use std::time::Instant;
15use tokio::time::{self};
16
17pub struct ProxyPool {
19 sources: Vec<String>,
21 proxies: RwLock<Vec<Proxy>>,
23 pub config: HostConfig,
25 last_proxy_index: Mutex<usize>,
27}
28
29impl ProxyPool {
30 pub async fn new(
33 sources: Vec<String>,
34 config: HostConfig,
35 ) -> Result<Arc<Self>, reqwest::Error> {
36 let pool = Arc::new(Self {
37 sources,
38 proxies: RwLock::new(Vec::new()),
39 config,
40 last_proxy_index: Mutex::new(0),
41 });
42
43 pool.initialize_proxies().await?;
45
46 info!("Starting synchronous initial health check");
48 pool.check_all_proxies().await;
49
50 let (total, healthy) = pool.get_stats();
52 info!(
53 "Initial proxy pool status: {}/{} healthy proxies",
54 healthy, total
55 );
56 if healthy < pool.config.min_available_proxies {
57 warn!(
58 "Healthy proxies below minimum for host [{}]: {} < {}",
59 pool.config.host(),
60 healthy,
61 pool.config.min_available_proxies
62 );
63 }
64
65 let pool_clone = Arc::clone(&pool);
67 tokio::spawn(async move {
68 loop {
69 time::sleep(pool_clone.config.health_check_interval).await;
70 pool_clone.check_all_proxies().await;
71
72 let (total, healthy) = pool_clone.get_stats();
73 info!(
74 "Proxy pool status update: {}/{} healthy proxies",
75 healthy, total
76 );
77 if healthy < pool_clone.config.min_available_proxies {
78 warn!(
79 "Healthy proxies below minimum for host [{}]: {} < {}",
80 pool_clone.config.host(),
81 healthy,
82 pool_clone.config.min_available_proxies
83 );
84 }
85 }
86 });
87
88 Ok(pool)
89 }
90
91 async fn initialize_proxies(&self) -> Result<(), reqwest::Error> {
93 info!(
94 "Initializing proxy pool from {} sources",
95 self.sources.len()
96 );
97
98 let mut all_proxies = HashSet::new();
99
100 for source in &self.sources {
102 match utils::fetch_proxies_from_source(source).await {
103 Ok(source_proxies) => {
104 info!("Fetched {} proxies from {}", source_proxies.len(), source);
105 all_proxies.extend(source_proxies);
106 }
107 Err(e) => {
108 warn!("Failed to fetch proxies from {}: {}", source, e);
109 }
110 }
111 }
112
113 info!(
114 "Found {} unique proxies before health check",
115 all_proxies.len()
116 );
117
118 {
120 let mut proxies = self.proxies.write();
121 for url in all_proxies {
122 proxies.push(Proxy::new(url, self.config.min_request_interval_ms));
123 }
124 }
125
126 Ok(())
127 }
128
129 pub async fn check_all_proxies(&self) {
131 info!("Starting health check for all proxies");
132
133 let proxies = {
134 let guard = self.proxies.read();
135 guard.clone()
136 };
137 let mut futures = Vec::new();
138
139 for proxy in &proxies {
140 let proxy_url = proxy.url.clone();
141 let check_url = self.config.health_check_url.clone();
142 let timeout = self.config.health_check_timeout;
143
144 let future = async move {
145 let start = Instant::now();
146
147 let reqwest_proxy = match proxy.to_reqwest_proxy() {
149 Ok(proxy) => proxy,
150 Err(_) => return (proxy_url, false, None),
151 };
152
153 let proxy_client = match reqwest::Client::builder()
154 .timeout(timeout)
155 .danger_accept_invalid_certs(true)
156 .proxy(reqwest_proxy)
157 .build()
158 {
159 Ok(client) => client,
160 Err(_) => return (proxy_url, false, None),
161 };
162
163 match proxy_client.get(&check_url).send().await {
165 Ok(resp) if resp.status().is_success() => {
166 let elapsed = start.elapsed().as_secs_f64();
167 (proxy_url, true, Some(elapsed))
168 }
169 _ => (proxy_url, false, None),
170 }
171 };
172
173 futures.push(future);
174 }
175
176 let results = future::join_all(futures).await;
178
179 let mut healthy_count = 0;
180 let mut unhealthy_count = 0;
181
182 {
184 let mut proxies = self.proxies.write();
185
186 for (url, is_healthy, response_time) in results {
187 if let Some(proxy) = proxies.iter_mut().find(|p| p.url == url) {
188 let old_status = proxy.status;
189
190 if is_healthy {
191 proxy.status = ProxyStatus::Healthy;
192 proxy.response_time = response_time;
193 proxy.cooldown_until = None;
194 healthy_count += 1;
195 } else {
196 proxy.status = ProxyStatus::Unhealthy;
197 proxy.cooldown_until = Some(Instant::now() + self.config.proxy_cooldown);
198 unhealthy_count += 1;
199 }
200
201 if old_status != proxy.status {
203 info!(
204 "Proxy {} status changed: {:?} -> {:?}",
205 proxy.url, old_status, proxy.status
206 );
207 }
208
209 proxy.last_check = Instant::now();
210 }
211 }
212 }
213
214 info!(
215 "Health check completed: {} healthy, {} unhealthy",
216 healthy_count, unhealthy_count
217 );
218 }
219
220 pub fn get_proxy(&self) -> Result<Proxy, NoProxyAvailable> {
222 self.get_proxy_internal(None)
223 }
224
225 pub fn get_proxy_excluding(
227 &self,
228 excluded: &HashSet<String>,
229 ) -> Result<Proxy, NoProxyAvailable> {
230 self.get_proxy_internal(Some(excluded))
231 }
232
233 fn get_proxy_internal(
234 &self,
235 excluded: Option<&HashSet<String>>,
236 ) -> Result<Proxy, NoProxyAvailable> {
237 let now = Instant::now();
238 let mut proxies = self.proxies.write();
239
240 for proxy in proxies.iter_mut() {
242 if proxy.status == ProxyStatus::Unhealthy
243 && proxy
244 .cooldown_until
245 .is_some_and(|cooldown_until| cooldown_until <= now)
246 {
247 proxy.status = ProxyStatus::HalfOpen;
248 proxy.cooldown_until = None;
249 }
250 }
251
252 let healthy_proxies: Vec<&Proxy> = proxies
254 .iter()
255 .filter(|p| matches!(p.status, ProxyStatus::Healthy | ProxyStatus::HalfOpen))
256 .filter(|p| excluded.map(|urls| !urls.contains(&p.url)).unwrap_or(true))
257 .collect();
258
259 if healthy_proxies.is_empty() {
260 return Err(NoProxyAvailable);
261 }
262
263 let selected = match self.config.selection_strategy {
265 ProxySelectionStrategy::FastestResponse => {
266 healthy_proxies
268 .iter()
269 .min_by(|a, b| {
270 a.response_time
271 .unwrap_or(f64::MAX)
272 .partial_cmp(&b.response_time.unwrap_or(f64::MAX))
273 .unwrap_or(std::cmp::Ordering::Equal)
274 })
275 .unwrap()
276 }
277 ProxySelectionStrategy::MostReliable => {
278 healthy_proxies
280 .iter()
281 .max_by(|a, b| {
282 a.success_rate()
283 .partial_cmp(&b.success_rate())
284 .unwrap_or(std::cmp::Ordering::Equal)
285 })
286 .unwrap()
287 }
288 ProxySelectionStrategy::TopKReliableRandom => {
289 let mut ranked = healthy_proxies;
290 ranked.sort_by(|a, b| {
291 b.success_rate()
292 .partial_cmp(&a.success_rate())
293 .unwrap_or(std::cmp::Ordering::Equal)
294 });
295 let top_k = self.config.reliable_top_k.min(ranked.len()).max(1);
296 let mut rng = rand::rng();
297 let idx = rng.random_range(0..top_k);
298 ranked[idx]
299 }
300 ProxySelectionStrategy::Random => {
301 let mut rng = rand::rng();
303 let idx = rng.random_range(0..healthy_proxies.len());
304 healthy_proxies[idx]
305 }
306 ProxySelectionStrategy::RoundRobin => {
307 let mut last_index = self.last_proxy_index.lock();
309 *last_index = (*last_index + 1) % healthy_proxies.len();
310 healthy_proxies[*last_index]
311 }
312 };
313
314 Ok((*selected).clone())
315 }
316
317 pub fn report_proxy_success(&self, url: &str) {
319 let mut proxies = self.proxies.write();
320 if let Some(proxy) = proxies.iter_mut().find(|p| p.url == url) {
321 proxy.success_count += 1;
322 proxy.status = ProxyStatus::Healthy;
323 proxy.cooldown_until = None;
324 }
325 }
326
327 pub fn report_proxy_failure(&self, url: &str) {
329 let mut proxies = self.proxies.write();
330 if let Some(proxy) = proxies.iter_mut().find(|p| p.url == url) {
331 proxy.failure_count += 1;
332 let old_status = proxy.status;
333 proxy.status = ProxyStatus::Unhealthy;
334 proxy.cooldown_until = Some(Instant::now() + self.config.proxy_cooldown);
335
336 if old_status != ProxyStatus::Unhealthy {
337 warn!(
338 "Proxy {} marked unhealthy: {} failures, {} successes, cooldown {:?}",
339 proxy.url, proxy.failure_count, proxy.success_count, self.config.proxy_cooldown
340 );
341 }
342 }
343 }
344
345 pub fn get_stats(&self) -> (usize, usize) {
347 let proxies = self.proxies.read();
348 let total = proxies.len();
349 let healthy = proxies
350 .iter()
351 .filter(|p| matches!(p.status, ProxyStatus::Healthy | ProxyStatus::HalfOpen))
352 .count();
353
354 (total, healthy)
355 }
356}