1use crate::{
2 BoxLoadBalancer, LoadBalancer,
3 simple::{Entry, SimpleLoadBalancer},
4};
5use async_trait::async_trait;
6use reqwest::Proxy;
7use std::{
8 sync::{
9 Arc,
10 atomic::{AtomicUsize, Ordering},
11 },
12 time::Duration,
13};
14use tokio::{
15 spawn,
16 task::JoinHandle,
17 time::{Instant, sleep},
18};
19
20#[derive(Clone)]
23pub struct ProxyPool {
24 test_url: String,
25 proxy: Option<Proxy>,
26 timeout: Duration,
27 available_count: Arc<AtomicUsize>,
28 lb: SimpleLoadBalancer<Arc<str>>,
29}
30
31impl ProxyPool {
32 pub fn new<T: IntoIterator<Item = impl AsRef<str>>>(url: T) -> Self {
34 Self {
35 test_url: "https://bilibili.com".to_string(),
36 proxy: None,
37 timeout: Duration::from_secs(3),
38 available_count: Arc::new(AtomicUsize::new(0)),
39 lb: SimpleLoadBalancer::new(url.into_iter().map(|v| v.as_ref().into()).collect()),
40 }
41 }
42
43 pub fn test_url(mut self, test_url: String) -> Self {
45 self.test_url = test_url;
46 self
47 }
48
49 pub fn timeout(mut self, timeout: Duration) -> Self {
51 self.timeout = timeout;
52 self
53 }
54
55 pub fn proxy(mut self, proxy: Proxy) -> Self {
57 self.proxy = Some(proxy);
58 self
59 }
60
61 pub fn available_count(&self) -> usize {
63 self.available_count.load(Ordering::Relaxed)
64 }
65
66 pub async fn extend<T: IntoIterator<Item = impl AsRef<str>>>(
71 &self,
72 urls: T,
73 ) -> anyhow::Result<()> {
74 let new_entries = urls
75 .into_iter()
76 .map(|v| Entry {
77 value: Arc::from(v.as_ref()),
78 })
79 .collect::<Vec<_>>();
80
81 self.lb
82 .update(async |v| {
83 let mut lock = v.entries.write().await;
84
85 lock.extend(new_entries.clone());
86 v.cursor.store(0, Ordering::Relaxed);
87 self.available_count.store(lock.len(), Ordering::Relaxed);
88
89 Ok(())
90 })
91 .await
92 }
93
94 pub async fn extend_check<T: IntoIterator<Item = impl AsRef<str>>>(
99 &self,
100 url: T,
101 retry_count: usize,
102 ) -> anyhow::Result<()> {
103 let new_entries = url
104 .into_iter()
105 .map(|v| Entry {
106 value: Arc::from(v.as_ref()),
107 })
108 .collect::<Vec<Entry<Arc<str>>>>();
109
110 self.lb
111 .update(async |v| {
112 let old_entries = {
113 let lock = v.entries.read().await;
114 let mut result = Vec::with_capacity(lock.len() + new_entries.len());
115
116 result.extend_from_slice(&new_entries);
117 result.extend(lock.iter().cloned());
118
119 result
120 };
121
122 let mut result = self.internal_check(&old_entries, retry_count).await?;
123
124 result.sort_by_key(|(_, latency)| *latency);
125
126 let mut new_entries = Vec::with_capacity(result.len());
127
128 for (index, _) in result {
129 new_entries.push(old_entries[index].clone());
130 }
131
132 let mut lock = v.entries.write().await;
133
134 *lock = new_entries;
135 v.cursor.store(0, Ordering::Relaxed);
136 self.available_count.store(lock.len(), Ordering::Relaxed);
137
138 Ok(())
139 })
140 .await
141 }
142
143 pub async fn check(&self, retry_count: usize) -> anyhow::Result<()> {
145 self.lb
146 .update(async |v| {
147 let old_entries = v.entries.read().await;
148
149 let mut result = self.internal_check(&old_entries, retry_count).await?;
150
151 result.sort_by_key(|(_, latency)| *latency);
152
153 let mut new_entries = Vec::with_capacity(result.len());
154
155 for (index, _) in result {
156 new_entries.push(old_entries[index].clone());
157 }
158
159 drop(old_entries);
160
161 let mut lock = v.entries.write().await;
162
163 *lock = new_entries;
164 v.cursor.store(0, Ordering::Relaxed);
165 self.available_count.store(lock.len(), Ordering::Relaxed);
166
167 Ok(())
168 })
169 .await
170 }
171
172 pub async fn spawn_check(
176 &self,
177 check_interval: Duration,
178 retry_count: usize,
179 ) -> anyhow::Result<JoinHandle<()>> {
180 self.check(retry_count).await?;
181
182 let this = self.clone();
183
184 Ok(spawn(async move {
185 loop {
186 sleep(check_interval).await;
187 _ = this.check(retry_count).await;
188 }
189 }))
190 }
191
192 async fn internal_check(
193 &self,
194 entries: &Vec<Entry<Arc<str>>>,
195 retry_count: usize,
196 ) -> anyhow::Result<Vec<(usize, u128)>> {
197 let mut result = Vec::new();
198
199 for (index, entry) in entries.iter().enumerate() {
200 let mut latency = None;
201
202 for _ in 0..=retry_count {
203 let client = if let Some(proxy) = self.proxy.clone() {
204 reqwest::ClientBuilder::new()
205 .proxy(proxy)
206 .proxy(Proxy::all(&*entry.value)?)
207 .timeout(self.timeout)
208 .build()?
209 } else {
210 reqwest::ClientBuilder::new()
211 .proxy(Proxy::all(&*entry.value)?)
212 .timeout(self.timeout)
213 .build()?
214 };
215
216 let start = Instant::now();
217
218 if let Ok(v) = client.get(&self.test_url).send().await {
219 if v.status().is_success() {
220 latency = Some(start.elapsed().as_millis());
221 break;
222 }
223 }
224 }
225
226 if let Some(v) = latency {
227 result.push((index, v));
228 }
229 }
230
231 Ok(result)
232 }
233}
234
235impl LoadBalancer<String> for ProxyPool {
236 async fn alloc(&self) -> String {
237 LoadBalancer::alloc(&self.lb).await.to_string()
238 }
239
240 fn try_alloc(&self) -> Option<String> {
241 LoadBalancer::try_alloc(&self.lb).map(|v| v.to_string())
242 }
243}
244
245#[async_trait]
246impl BoxLoadBalancer<String> for ProxyPool {
247 async fn alloc(&self) -> String {
248 LoadBalancer::alloc(self).await
249 }
250
251 fn try_alloc(&self) -> Option<String> {
252 LoadBalancer::try_alloc(self)
253 }
254}