reqwest_proxy_pool/
pool.rs1use crate::config::{ProxyPoolConfig, ProxySelectionStrategy};
4use crate::error::NoProxyAvailable;
5use crate::proxy::{Proxy, ProxyStatus};
6use crate::utils;
7
8use futures::future;
9use log::{info, warn};
10use parking_lot::{Mutex, RwLock};
11use rand::Rng;
12use std::collections::HashSet;
13use std::sync::Arc;
14use std::time::Instant;
15use tokio::time::{self};
16
17pub struct ProxyPool {
19 proxies: RwLock<Vec<Proxy>>,
21 pub config: ProxyPoolConfig,
23 last_proxy_index: Mutex<usize>,
25}
26
27impl ProxyPool {
28 pub async fn new(config: ProxyPoolConfig) -> Result<Arc<Self>, reqwest::Error> {
31 let pool = Arc::new(Self {
32 proxies: RwLock::new(Vec::new()),
33 config,
34 last_proxy_index: Mutex::new(0),
35 });
36
37 pool.initialize_proxies().await?;
39
40 info!("Starting synchronous initial health check");
42 pool.check_all_proxies().await;
43
44 let (total, healthy) = pool.get_stats();
46 info!("Initial proxy pool status: {}/{} healthy proxies", healthy, total);
47
48 let pool_clone = Arc::clone(&pool);
50 tokio::spawn(async move {
51 loop {
52 time::sleep(pool_clone.config.health_check_interval).await;
53 pool_clone.check_all_proxies().await;
54
55 let (total, healthy) = pool_clone.get_stats();
56 info!("Proxy pool status update: {}/{} healthy proxies", healthy, total);
57 }
58 });
59
60 Ok(pool)
61 }
62
63 async fn initialize_proxies(&self) -> Result<(), reqwest::Error> {
65 info!("Initializing proxy pool from {} sources", self.config.sources.len());
66
67 let mut all_proxies = HashSet::new();
68
69 for source in &self.config.sources {
71 match utils::fetch_proxies_from_source(source).await {
72 Ok(source_proxies) => {
73 info!("Fetched {} proxies from {}", source_proxies.len(), source);
74 all_proxies.extend(source_proxies);
75 }
76 Err(e) => {
77 warn!("Failed to fetch proxies from {}: {}", source, e);
78 }
79 }
80 }
81
82 info!("Found {} unique proxies before health check", all_proxies.len());
83
84 {
86 let mut proxies = self.proxies.write();
87 for url in all_proxies {
88 proxies.push(Proxy::new(url, self.config.max_requests_per_second));
89 }
90 }
91
92 Ok(())
93 }
94
95 pub async fn check_all_proxies(&self) {
97 info!("Starting health check for all proxies");
98
99 let proxies = {
100 let guard = self.proxies.read();
101 guard.clone()
102 };
103
104 let mut futures = Vec::new();
105
106 for proxy in &proxies {
107 let proxy_url = proxy.url.clone();
108 let check_url = self.config.health_check_url.clone();
109 let timeout = self.config.health_check_timeout;
110
111 let future = async move {
112 let start = Instant::now();
113
114 let proxy_client = match reqwest::Client::builder()
116 .timeout(timeout)
117 .proxy(reqwest::Proxy::all(&proxy_url).unwrap_or_else(|_| {
118 reqwest::Proxy::custom(move |_| -> Option<reqwest::Url> { None })
120 }))
121 .build() {
122 Ok(client) => client,
123 Err(_) => return (proxy_url, false, None),
124 };
125
126 match proxy_client.get(&check_url).send().await {
128 Ok(resp) if resp.status().is_success() => {
129 let elapsed = start.elapsed().as_secs_f64();
130 (proxy_url, true, Some(elapsed))
131 }
132 _ => (proxy_url, false, None),
133 }
134 };
135
136 futures.push(future);
137 }
138
139 let results = future::join_all(futures).await;
141
142 let mut healthy_count = 0;
143 let mut unhealthy_count = 0;
144
145 {
147 let mut proxies = self.proxies.write();
148
149 for (url, is_healthy, response_time) in results {
150 if let Some(proxy) = proxies.iter_mut().find(|p| p.url == url) {
151 let old_status = proxy.status;
152
153 if is_healthy {
154 proxy.status = ProxyStatus::Healthy;
155 proxy.response_time = response_time;
156 healthy_count += 1;
157 } else {
158 proxy.status = ProxyStatus::Unhealthy;
159 unhealthy_count += 1;
160 }
161
162 if old_status != proxy.status {
164 info!("Proxy {} status changed: {:?} -> {:?}",
165 proxy.url, old_status, proxy.status);
166 }
167
168 proxy.last_check = Instant::now();
169 }
170 }
171 }
172
173 info!("Health check completed: {} healthy, {} unhealthy",
174 healthy_count, unhealthy_count);
175 }
176
177 pub fn get_proxy(&self) -> Result<Proxy, NoProxyAvailable> {
179 let proxies = self.proxies.read();
180
181 let healthy_proxies: Vec<&Proxy> = proxies.iter()
183 .filter(|p| p.status == ProxyStatus::Healthy)
184 .collect();
185
186 if healthy_proxies.is_empty() {
187 return Err(NoProxyAvailable);
188 }
189
190 let selected = match self.config.selection_strategy {
192 ProxySelectionStrategy::FastestResponse => {
193 healthy_proxies.iter()
195 .min_by(|a, b| {
196 a.response_time.unwrap_or(f64::MAX)
197 .partial_cmp(&b.response_time.unwrap_or(f64::MAX))
198 .unwrap_or(std::cmp::Ordering::Equal)
199 })
200 .unwrap()
201 },
202 ProxySelectionStrategy::MostReliable => {
203 healthy_proxies.iter()
205 .max_by(|a, b| {
206 a.success_rate().partial_cmp(&b.success_rate())
207 .unwrap_or(std::cmp::Ordering::Equal)
208 })
209 .unwrap()
210 },
211 ProxySelectionStrategy::Random => {
212 let mut rng = rand::rng();
214 let idx = rng.random_range(0..healthy_proxies.len());
215 &healthy_proxies[idx]
216 },
217 ProxySelectionStrategy::RoundRobin => {
218 let mut last_index = self.last_proxy_index.lock();
220 *last_index = (*last_index + 1) % healthy_proxies.len();
221 &healthy_proxies[*last_index]
222 }
223 };
224
225 Ok((*selected).clone())
226 }
227
228 pub fn report_proxy_success(&self, url: &str) {
230 let mut proxies = self.proxies.write();
231 if let Some(proxy) = proxies.iter_mut().find(|p| p.url == url) {
232 proxy.success_count += 1;
233 proxy.status = ProxyStatus::Healthy;
234 }
235 }
236
237 pub fn report_proxy_failure(&self, url: &str) {
239 let mut proxies = self.proxies.write();
240 if let Some(proxy) = proxies.iter_mut().find(|p| p.url == url) {
241 proxy.failure_count += 1;
242
243 let failure_ratio = proxy.failure_count as f64 /
245 (proxy.success_count + proxy.failure_count) as f64;
246
247 if failure_ratio > 0.5 && proxy.failure_count >= 3 {
248 let old_status = proxy.status;
249 proxy.status = ProxyStatus::Unhealthy;
250
251 if old_status != ProxyStatus::Unhealthy {
252 warn!("Proxy {} marked unhealthy: {} failures, {} successes",
253 proxy.url, proxy.failure_count, proxy.success_count);
254 }
255 }
256 }
257 }
258
259 pub fn get_stats(&self) -> (usize, usize) {
261 let proxies = self.proxies.read();
262 let total = proxies.len();
263 let healthy = proxies.iter()
264 .filter(|p| p.status == ProxyStatus::Healthy)
265 .count();
266
267 (total, healthy)
268 }
269}