load_balancer/
proxy.rs

1use crate::{
2    BoxLoadBalancer, LoadBalancer,
3    simple::{Entry, SimpleLoadBalancer},
4};
5use async_trait::async_trait;
6use reqwest::Proxy;
7use std::{
8    sync::{
9        Arc,
10        atomic::{AtomicUsize, Ordering},
11    },
12    time::Duration,
13};
14use tokio::{
15    spawn,
16    task::JoinHandle,
17    time::{Instant, sleep},
18};
19
20/// An advanced proxy pool that measures latency, removes dead proxies,
21/// and sorts proxies by response time in ascending order.
22#[derive(Clone)]
23pub struct ProxyPool {
24    test_url: String,
25    proxy: Option<Proxy>,
26    timeout: Duration,
27    available_count: Arc<AtomicUsize>,
28    lb: SimpleLoadBalancer<Arc<str>>,
29}
30
31impl ProxyPool {
32    /// Create a new `LatencyProxyPool` from a list of proxy URLs.
33    pub fn new<T: IntoIterator<Item = impl AsRef<str>>>(url: T) -> Self {
34        Self {
35            test_url: "https://bilibili.com".to_string(),
36            proxy: None,
37            timeout: Duration::from_secs(3),
38            available_count: Arc::new(AtomicUsize::new(0)),
39            lb: SimpleLoadBalancer::new(url.into_iter().map(|v| v.as_ref().into()).collect()),
40        }
41    }
42
43    /// Set the URL used for testing proxy connectivity.
44    pub fn test_url(mut self, test_url: String) -> Self {
45        self.test_url = test_url;
46        self
47    }
48
49    /// Set the request timeout for proxy testing.
50    pub fn timeout(mut self, timeout: Duration) -> Self {
51        self.timeout = timeout;
52        self
53    }
54
55    /// Set an optional upstream proxy for proxy validation.
56    pub fn proxy(mut self, proxy: Proxy) -> Self {
57        self.proxy = Some(proxy);
58        self
59    }
60
61    /// Get the number of currently available (healthy) proxies.
62    pub fn available_count(&self) -> usize {
63        self.available_count.load(Ordering::Relaxed)
64    }
65
66    /// Add new proxies to the pool without performing immediate validation.
67    ///
68    /// New entries are appended, the cursor is reset, and the available count is updated.
69    /// Validation occurs on the next `check()` call.
70    pub async fn extend<T: IntoIterator<Item = impl AsRef<str>>>(
71        &self,
72        urls: T,
73    ) -> anyhow::Result<()> {
74        let new_entries = urls
75            .into_iter()
76            .map(|v| Entry {
77                value: Arc::from(v.as_ref()),
78            })
79            .collect::<Vec<_>>();
80
81        self.lb
82            .update(async |v| {
83                let mut lock = v.entries.write().await;
84
85                lock.extend(new_entries.clone());
86                v.cursor.store(0, Ordering::Relaxed);
87                self.available_count.store(lock.len(), Ordering::Relaxed);
88
89                Ok(())
90            })
91            .await
92    }
93
94    /// Add new proxies and immediately perform connectivity and latency checks.
95    ///
96    /// Proxies are validated, failed ones are removed, and remaining entries
97    /// are sorted by latency (ascending).
98    pub async fn extend_check<T: IntoIterator<Item = impl AsRef<str>>>(
99        &self,
100        url: T,
101        retry_count: usize,
102    ) -> anyhow::Result<()> {
103        let new_entries = url
104            .into_iter()
105            .map(|v| Entry {
106                value: Arc::from(v.as_ref()),
107            })
108            .collect::<Vec<Entry<Arc<str>>>>();
109
110        self.lb
111            .update(async |v| {
112                let old_entries = {
113                    let lock = v.entries.read().await;
114                    let mut result = Vec::with_capacity(lock.len() + new_entries.len());
115
116                    result.extend_from_slice(&new_entries);
117                    result.extend(lock.iter().cloned());
118
119                    result
120                };
121
122                let mut result = self.internal_check(&old_entries, retry_count).await?;
123
124                result.sort_by_key(|(_, latency)| *latency);
125
126                let mut new_entries = Vec::with_capacity(result.len());
127
128                for (index, _) in result {
129                    new_entries.push(old_entries[index].clone());
130                }
131
132                let mut lock = v.entries.write().await;
133
134                *lock = new_entries;
135                v.cursor.store(0, Ordering::Relaxed);
136                self.available_count.store(lock.len(), Ordering::Relaxed);
137
138                Ok(())
139            })
140            .await
141    }
142
143    /// Validate all proxies, remove dead ones, and sort by latency.
144    pub async fn check(&self, retry_count: usize) -> anyhow::Result<()> {
145        self.lb
146            .update(async |v| {
147                let old_entries = v.entries.read().await;
148
149                let mut result = self.internal_check(&old_entries, retry_count).await?;
150
151                result.sort_by_key(|(_, latency)| *latency);
152
153                let mut new_entries = Vec::with_capacity(result.len());
154
155                for (index, _) in result {
156                    new_entries.push(old_entries[index].clone());
157                }
158
159                drop(old_entries);
160
161                let mut lock = v.entries.write().await;
162
163                *lock = new_entries;
164                v.cursor.store(0, Ordering::Relaxed);
165                self.available_count.store(lock.len(), Ordering::Relaxed);
166
167                Ok(())
168            })
169            .await
170    }
171
172    /// Spawn a background task to periodically validate proxies and update order by latency.
173    ///
174    /// Returns a `JoinHandle` to allow cancellation or awaiting of the task.
175    pub async fn spawn_check(
176        &self,
177        check_interval: Duration,
178        retry_count: usize,
179    ) -> anyhow::Result<JoinHandle<()>> {
180        self.check(retry_count).await?;
181
182        let this = self.clone();
183
184        Ok(spawn(async move {
185            loop {
186                sleep(check_interval).await;
187                _ = this.check(retry_count).await;
188            }
189        }))
190    }
191
192    async fn internal_check(
193        &self,
194        entries: &Vec<Entry<Arc<str>>>,
195        retry_count: usize,
196    ) -> anyhow::Result<Vec<(usize, u128)>> {
197        let mut result = Vec::new();
198
199        for (index, entry) in entries.iter().enumerate() {
200            let mut latency = None;
201
202            for _ in 0..=retry_count {
203                let client = if let Some(proxy) = self.proxy.clone() {
204                    reqwest::ClientBuilder::new()
205                        .proxy(proxy)
206                        .proxy(Proxy::all(&*entry.value)?)
207                        .timeout(self.timeout)
208                        .build()?
209                } else {
210                    reqwest::ClientBuilder::new()
211                        .proxy(Proxy::all(&*entry.value)?)
212                        .timeout(self.timeout)
213                        .build()?
214                };
215
216                let start = Instant::now();
217
218                if let Ok(v) = client.get(&self.test_url).send().await {
219                    if v.status().is_success() {
220                        latency = Some(start.elapsed().as_millis());
221                        break;
222                    }
223                }
224            }
225
226            if let Some(v) = latency {
227                result.push((index, v));
228            }
229        }
230
231        Ok(result)
232    }
233}
234
235impl LoadBalancer<String> for ProxyPool {
236    async fn alloc(&self) -> String {
237        LoadBalancer::alloc(&self.lb).await.to_string()
238    }
239
240    fn try_alloc(&self) -> Option<String> {
241        LoadBalancer::try_alloc(&self.lb).map(|v| v.to_string())
242    }
243}
244
245#[async_trait]
246impl BoxLoadBalancer<String> for ProxyPool {
247    async fn alloc(&self) -> String {
248        LoadBalancer::alloc(self).await
249    }
250
251    fn try_alloc(&self) -> Option<String> {
252        LoadBalancer::try_alloc(self)
253    }
254}