1use crate::{
2 BoxLoadBalancer, LoadBalancer,
3 simple::{Entry, SimpleLoadBalancer, SimpleLoadBalancerRef},
4};
5use async_trait::async_trait;
6use reqwest::Proxy;
7use std::{
8 ops::Range,
9 sync::{
10 Arc,
11 atomic::{AtomicUsize, Ordering},
12 },
13 time::Duration,
14};
15use tokio::{
16 spawn,
17 sync::Semaphore,
18 task::JoinHandle,
19 time::{Instant, sleep},
20};
21
22#[derive(Clone)]
25pub struct ProxyPool {
26 code_range: Range<u16>,
27 test_url: String,
28 timeout: Duration,
29 proxy: Option<Proxy>,
30 max_check_concurrency: usize,
31 available_count: Arc<AtomicUsize>,
32 lb: SimpleLoadBalancer<Arc<str>>,
33}
34
35impl ProxyPool {
36 pub fn new<T: IntoIterator<Item = impl AsRef<str>>>(url: T) -> Self {
38 Self {
39 code_range: (200..300),
40 test_url: "https://apple.com".to_string(),
41 timeout: Duration::from_secs(5),
42 proxy: None,
43 max_check_concurrency: 1000,
44 available_count: Arc::new(AtomicUsize::new(0)),
45 lb: SimpleLoadBalancer::new(url.into_iter().map(|v| v.as_ref().into()).collect()),
46 }
47 }
48
49 pub fn code_range(mut self, code_range: Range<u16>) -> Self {
51 self.code_range = code_range;
52 self
53 }
54
55 pub fn test_url(mut self, test_url: String) -> Self {
57 self.test_url = test_url;
58 self
59 }
60
61 pub fn timeout(mut self, timeout: Duration) -> Self {
63 self.timeout = timeout;
64 self
65 }
66
67 pub fn proxy(mut self, proxy: Proxy) -> Self {
69 self.proxy = Some(proxy);
70 self
71 }
72
73 pub fn max_check_concurrency(mut self, max_check_concurrency: usize) -> Self {
75 self.max_check_concurrency = max_check_concurrency;
76 self
77 }
78
79 pub fn available_count(&self) -> usize {
81 self.available_count.load(Ordering::Relaxed)
82 }
83
84 pub async fn extend<T: IntoIterator<Item = impl AsRef<str>>>(
89 &self,
90 urls: T,
91 ) -> anyhow::Result<()> {
92 let new_entries = urls
93 .into_iter()
94 .map(|v| Entry {
95 value: Arc::from(v.as_ref()),
96 })
97 .collect::<Vec<_>>();
98
99 self.lb
100 .update(async |v| {
101 let mut lock = v.entries.write().await;
102
103 lock.extend(new_entries.clone());
104 v.cursor.store(0, Ordering::Relaxed);
105 self.available_count.store(lock.len(), Ordering::Relaxed);
106
107 Ok(())
108 })
109 .await
110 }
111
112 pub async fn extend_check<T: IntoIterator<Item = impl AsRef<str>>>(
117 &self,
118 url: T,
119 retry_count: usize,
120 ) -> anyhow::Result<()> {
121 let new_entries = url
122 .into_iter()
123 .map(|v| Entry {
124 value: Arc::from(v.as_ref()),
125 })
126 .collect::<Vec<Entry<Arc<str>>>>();
127
128 self.lb
129 .update(async |v| {
130 let old_entries = {
131 let lock = v.entries.read().await;
132 let mut result = Vec::with_capacity(lock.len() + new_entries.len());
133
134 result.extend_from_slice(&new_entries);
135 result.extend(lock.iter().cloned());
136
137 result
138 };
139
140 let result = self.internal_check(&old_entries, retry_count).await?;
141
142 let mut new_entries = Vec::with_capacity(result.len());
143
144 for (index, _) in result {
145 new_entries.push(old_entries[index].clone());
146 }
147
148 let mut lock = v.entries.write().await;
149
150 *lock = new_entries;
151 v.cursor.store(0, Ordering::Relaxed);
152 self.available_count.store(lock.len(), Ordering::Relaxed);
153
154 Ok(())
155 })
156 .await
157 }
158
159 pub async fn check(&self, retry_count: usize) -> anyhow::Result<()> {
161 self.lb
162 .update(async |v| {
163 let old_entries = v.entries.read().await;
164
165 let result = self.internal_check(&old_entries, retry_count).await?;
166
167 let mut new_entries = Vec::with_capacity(result.len());
168
169 for (index, _) in result {
170 new_entries.push(old_entries[index].clone());
171 }
172
173 drop(old_entries);
174
175 let mut lock = v.entries.write().await;
176
177 *lock = new_entries;
178 v.cursor.store(0, Ordering::Relaxed);
179 self.available_count.store(lock.len(), Ordering::Relaxed);
180
181 Ok(())
182 })
183 .await
184 }
185
186 pub async fn spawn_check(
190 &self,
191 check_interval: Duration,
192 retry_count: usize,
193 ) -> anyhow::Result<JoinHandle<()>> {
194 self.check(retry_count).await?;
195
196 let this = self.clone();
197
198 Ok(spawn(async move {
199 loop {
200 sleep(check_interval).await;
201 _ = this.check(retry_count).await;
202 }
203 }))
204 }
205
206 pub async fn spawn_check_callback<F, R>(
208 &self,
209 check_interval: Duration,
210 retry_count: usize,
211 callback: F,
212 ) -> anyhow::Result<JoinHandle<anyhow::Result<()>>>
213 where
214 F: Fn() -> R + Send + 'static,
215 R: Future<Output = anyhow::Result<()>> + Send,
216 {
217 self.check(retry_count).await?;
218 callback().await?;
219
220 let this = self.clone();
221
222 Ok(spawn(async move {
223 loop {
224 sleep(check_interval).await;
225 _ = this.check(retry_count).await;
226 callback().await?;
227 }
228 }))
229 }
230
231 pub async fn update<F, R>(&self, handle: F) -> anyhow::Result<()>
233 where
234 F: Fn(Arc<SimpleLoadBalancerRef<Arc<str>>>) -> R,
235 R: Future<Output = anyhow::Result<()>>,
236 {
237 self.lb.update(handle).await
238 }
239
240 async fn internal_check(
241 &self,
242 entries: &Vec<Entry<Arc<str>>>,
243 retry_count: usize,
244 ) -> anyhow::Result<Vec<(usize, u128)>> {
245 let semaphore = Arc::new(Semaphore::new(self.max_check_concurrency));
246 let mut task = Vec::with_capacity(entries.len());
247
248 for (index, entry) in entries.iter().enumerate() {
249 let permit = semaphore.clone().acquire_owned().await.unwrap();
250 let entry = entry.clone();
251 let code_range = self.code_range.clone();
252 let test_url = self.test_url.clone();
253 let timeout = self.timeout;
254 let upstream_proxy = self.proxy.clone();
255 let entry_value = entry.value.clone();
256
257 task.push(tokio::spawn(async move {
258 let _permit = permit;
259 let mut latency = None;
260
261 for _ in 0..=retry_count {
262 let client = if let Some(proxy) = upstream_proxy.clone() {
263 reqwest::ClientBuilder::new()
264 .proxy(proxy)
265 .proxy(Proxy::all(&*entry_value)?)
266 .timeout(timeout)
267 .build()?
268 } else {
269 reqwest::ClientBuilder::new()
270 .proxy(Proxy::all(&*entry_value)?)
271 .timeout(timeout)
272 .build()?
273 };
274
275 let start = Instant::now();
276
277 if let Ok(v) = client.get(&test_url).send().await {
278 if code_range.contains(&v.status().as_u16()) {
279 latency = Some(start.elapsed().as_millis());
280 break;
281 }
282 }
283 }
284
285 anyhow::Ok(latency.map(|v| (index, v)))
286 }));
287 }
288
289 let mut result = Vec::new();
290
291 for i in task {
292 if let Ok(Ok(Some(r))) = i.await {
293 result.push(r);
294 }
295 }
296
297 result.sort_by_key(|(_, latency)| *latency);
298
299 Ok(result)
300 }
301}
302
303impl LoadBalancer<String> for ProxyPool {
304 async fn alloc(&self) -> String {
305 LoadBalancer::alloc(&self.lb).await.to_string()
306 }
307
308 fn try_alloc(&self) -> Option<String> {
309 LoadBalancer::try_alloc(&self.lb).map(|v| v.to_string())
310 }
311}
312
313#[async_trait]
314impl BoxLoadBalancer<String> for ProxyPool {
315 async fn alloc(&self) -> String {
316 LoadBalancer::alloc(self).await
317 }
318
319 fn try_alloc(&self) -> Option<String> {
320 LoadBalancer::try_alloc(self)
321 }
322}