load_balancer/
proxy.rs

1use crate::{
2    BoxLoadBalancer, LoadBalancer,
3    simple::{Entry, SimpleLoadBalancer, SimpleLoadBalancerRef},
4};
5use async_trait::async_trait;
6use reqwest::Proxy;
7use std::{
8    ops::Range,
9    sync::{
10        Arc,
11        atomic::{AtomicUsize, Ordering},
12    },
13    time::Duration,
14};
15use tokio::{
16    spawn,
17    sync::Semaphore,
18    task::JoinHandle,
19    time::{Instant, sleep},
20};
21
22/// An advanced proxy pool that measures latency, removes dead proxies,
23/// and sorts proxies by response time in ascending order.
24#[derive(Clone)]
25pub struct ProxyPool {
26    code_range: Range<u16>,
27    test_url: String,
28    timeout: Duration,
29    proxy: Option<Proxy>,
30    max_check_concurrency: usize,
31    available_count: Arc<AtomicUsize>,
32    lb: SimpleLoadBalancer<Arc<str>>,
33}
34
35impl ProxyPool {
36    /// Create a new `LatencyProxyPool` from a list of proxy URLs.
37    pub fn new<T: IntoIterator<Item = impl AsRef<str>>>(url: T) -> Self {
38        Self {
39            code_range: (200..300),
40            test_url: "https://apple.com".to_string(),
41            timeout: Duration::from_secs(5),
42            proxy: None,
43            max_check_concurrency: 1000,
44            available_count: Arc::new(AtomicUsize::new(0)),
45            lb: SimpleLoadBalancer::new(url.into_iter().map(|v| v.as_ref().into()).collect()),
46        }
47    }
48
49    /// Set the range of HTTP status codes that are considered successful.
50    pub fn code_range(mut self, code_range: Range<u16>) -> Self {
51        self.code_range = code_range;
52        self
53    }
54
55    /// Set the URL used for testing proxy connectivity.
56    pub fn test_url(mut self, test_url: String) -> Self {
57        self.test_url = test_url;
58        self
59    }
60
61    /// Set the request timeout for proxy testing.
62    pub fn timeout(mut self, timeout: Duration) -> Self {
63        self.timeout = timeout;
64        self
65    }
66
67    /// Set an optional upstream proxy for proxy validation.
68    pub fn proxy(mut self, proxy: Proxy) -> Self {
69        self.proxy = Some(proxy);
70        self
71    }
72
73    /// Set the maximum number of concurrent proxy checks during health validation.
74    pub fn max_check_concurrency(mut self, max_check_concurrency: usize) -> Self {
75        self.max_check_concurrency = max_check_concurrency;
76        self
77    }
78
79    /// Get the number of currently available (healthy) proxies.
80    pub fn available_count(&self) -> usize {
81        self.available_count.load(Ordering::Relaxed)
82    }
83
84    /// Add new proxies to the pool without performing immediate validation.
85    ///
86    /// New entries are appended, the cursor is reset, and the available count is updated.
87    /// Validation occurs on the next `check()` call.
88    pub async fn extend<T: IntoIterator<Item = impl AsRef<str>>>(
89        &self,
90        urls: T,
91    ) -> anyhow::Result<()> {
92        let new_entries = urls
93            .into_iter()
94            .map(|v| Entry {
95                value: Arc::from(v.as_ref()),
96            })
97            .collect::<Vec<_>>();
98
99        self.lb
100            .update(async |v| {
101                let mut lock = v.entries.write().await;
102
103                lock.extend(new_entries.clone());
104                v.cursor.store(0, Ordering::Relaxed);
105                self.available_count.store(lock.len(), Ordering::Relaxed);
106
107                Ok(())
108            })
109            .await
110    }
111
112    /// Add new proxies and immediately perform connectivity and latency checks.
113    ///
114    /// Proxies are validated, failed ones are removed, and remaining entries
115    /// are sorted by latency (ascending).
116    pub async fn extend_check<T: IntoIterator<Item = impl AsRef<str>>>(
117        &self,
118        url: T,
119        retry_count: usize,
120    ) -> anyhow::Result<()> {
121        let new_entries = url
122            .into_iter()
123            .map(|v| Entry {
124                value: Arc::from(v.as_ref()),
125            })
126            .collect::<Vec<Entry<Arc<str>>>>();
127
128        self.lb
129            .update(async |v| {
130                let old_entries = {
131                    let lock = v.entries.read().await;
132                    let mut result = Vec::with_capacity(lock.len() + new_entries.len());
133
134                    result.extend_from_slice(&new_entries);
135                    result.extend(lock.iter().cloned());
136
137                    result
138                };
139
140                let result = self.internal_check(&old_entries, retry_count).await?;
141
142                let mut new_entries = Vec::with_capacity(result.len());
143
144                for (index, _) in result {
145                    new_entries.push(old_entries[index].clone());
146                }
147
148                let mut lock = v.entries.write().await;
149
150                *lock = new_entries;
151                v.cursor.store(0, Ordering::Relaxed);
152                self.available_count.store(lock.len(), Ordering::Relaxed);
153
154                Ok(())
155            })
156            .await
157    }
158
159    /// Validate all proxies, remove dead ones, and sort by latency.
160    pub async fn check(&self, retry_count: usize) -> anyhow::Result<()> {
161        self.lb
162            .update(async |v| {
163                let old_entries = v.entries.read().await;
164
165                let result = self.internal_check(&old_entries, retry_count).await?;
166
167                let mut new_entries = Vec::with_capacity(result.len());
168
169                for (index, _) in result {
170                    new_entries.push(old_entries[index].clone());
171                }
172
173                drop(old_entries);
174
175                let mut lock = v.entries.write().await;
176
177                *lock = new_entries;
178                v.cursor.store(0, Ordering::Relaxed);
179                self.available_count.store(lock.len(), Ordering::Relaxed);
180
181                Ok(())
182            })
183            .await
184    }
185
186    /// Spawn a background task to periodically validate proxies and update order by latency.
187    ///
188    /// Returns a `JoinHandle` to allow cancellation or awaiting of the task.
189    pub async fn spawn_check(
190        &self,
191        check_interval: Duration,
192        retry_count: usize,
193    ) -> anyhow::Result<JoinHandle<()>> {
194        self.check(retry_count).await?;
195
196        let this = self.clone();
197
198        Ok(spawn(async move {
199            loop {
200                sleep(check_interval).await;
201                _ = this.check(retry_count).await;
202            }
203        }))
204    }
205
206    /// Spawn a background task with a callback after each check.
207    pub async fn spawn_check_callback<F, R>(
208        &self,
209        check_interval: Duration,
210        retry_count: usize,
211        callback: F,
212    ) -> anyhow::Result<JoinHandle<anyhow::Result<()>>>
213    where
214        F: Fn() -> R + Send + 'static,
215        R: Future<Output = anyhow::Result<()>> + Send,
216    {
217        self.check(retry_count).await?;
218        callback().await?;
219
220        let this = self.clone();
221
222        Ok(spawn(async move {
223            loop {
224                sleep(check_interval).await;
225                _ = this.check(retry_count).await;
226                callback().await?;
227            }
228        }))
229    }
230
231    /// Update the load balancer using a custom async handler.
232    pub async fn update<F, R>(&self, handle: F) -> anyhow::Result<()>
233    where
234        F: Fn(Arc<SimpleLoadBalancerRef<Arc<str>>>) -> R,
235        R: Future<Output = anyhow::Result<()>>,
236    {
237        self.lb.update(handle).await
238    }
239
240    async fn internal_check(
241        &self,
242        entries: &Vec<Entry<Arc<str>>>,
243        retry_count: usize,
244    ) -> anyhow::Result<Vec<(usize, u128)>> {
245        let semaphore = Arc::new(Semaphore::new(self.max_check_concurrency));
246        let mut task = Vec::with_capacity(entries.len());
247
248        for (index, entry) in entries.iter().enumerate() {
249            let permit = semaphore.clone().acquire_owned().await.unwrap();
250            let entry = entry.clone();
251            let code_range = self.code_range.clone();
252            let test_url = self.test_url.clone();
253            let timeout = self.timeout;
254            let upstream_proxy = self.proxy.clone();
255            let entry_value = entry.value.clone();
256
257            task.push(tokio::spawn(async move {
258                let _permit = permit;
259                let mut latency = None;
260
261                for _ in 0..=retry_count {
262                    let client = if let Some(proxy) = upstream_proxy.clone() {
263                        reqwest::ClientBuilder::new()
264                            .proxy(proxy)
265                            .proxy(Proxy::all(&*entry_value)?)
266                            .timeout(timeout)
267                            .build()?
268                    } else {
269                        reqwest::ClientBuilder::new()
270                            .proxy(Proxy::all(&*entry_value)?)
271                            .timeout(timeout)
272                            .build()?
273                    };
274
275                    let start = Instant::now();
276
277                    if let Ok(v) = client.get(&test_url).send().await {
278                        if code_range.contains(&v.status().as_u16()) {
279                            latency = Some(start.elapsed().as_millis());
280                            break;
281                        }
282                    }
283                }
284
285                anyhow::Ok(latency.map(|v| (index, v)))
286            }));
287        }
288
289        let mut result = Vec::new();
290
291        for i in task {
292            if let Ok(Ok(Some(r))) = i.await {
293                result.push(r);
294            }
295        }
296
297        result.sort_by_key(|(_, latency)| *latency);
298
299        Ok(result)
300    }
301}
302
303impl LoadBalancer<String> for ProxyPool {
304    async fn alloc(&self) -> String {
305        LoadBalancer::alloc(&self.lb).await.to_string()
306    }
307
308    fn try_alloc(&self) -> Option<String> {
309        LoadBalancer::try_alloc(&self.lb).map(|v| v.to_string())
310    }
311}
312
313#[async_trait]
314impl BoxLoadBalancer<String> for ProxyPool {
315    async fn alloc(&self) -> String {
316        LoadBalancer::alloc(self).await
317    }
318
319    fn try_alloc(&self) -> Option<String> {
320        LoadBalancer::try_alloc(self)
321    }
322}