1use std::{
7 sync::{Arc, Mutex},
8 thread,
9 time::Instant,
10};
11
12use crate::{
13 constants,
14 errors::ApiError,
15 opts::Opts,
16 proxy::{proxies_from_json, Proxy},
17 types::NaiveResponse,
18};
19
20lazy_static! {
21 static ref LAST_FETCHED: Arc<Mutex<Instant>> =
22 Arc::new(Mutex::new(Instant::now() - constants::DELAY));
23}
24
25#[derive(Clone, Debug)]
31pub struct Fetcher {
32 opts: Opts,
33 proxies: Vec<Proxy>,
34}
35
36impl Fetcher {
37 pub fn new(opts: Opts) -> Self {
54 Self {
55 opts,
56 proxies: Vec::new(),
57 }
58 }
59
60 pub fn try_get(&mut self, amount: usize) -> Result<Vec<Proxy>, ApiError> {
75 if self.proxies.len() >= amount {
76 Ok(self.proxies.split_off(self.proxies.len() - amount))
78 } else {
79 let mut request = self.request_builder();
81
82 if self.opts.is_premium() {
83 while self.proxies.len() < amount {
86 let mut proxies = self.fetch(&mut request)?;
87 self.proxies.append(&mut proxies);
88 }
89 } else {
90 let mut last_fetched = match LAST_FETCHED.lock() {
93 Ok(last_fetched) => last_fetched,
94 Err(err) => {
95 let mut poisioned = err.into_inner();
97 *poisioned = Instant::now();
98 poisioned
99 }
100 };
101
102 while self.proxies.len() < amount {
103 let delta = Instant::now().duration_since(*last_fetched);
105 if delta < constants::DELAY {
106 thread::sleep(constants::DELAY - delta);
107 }
108
109 let mut proxies = self.fetch(&mut request)?;
110 self.proxies.append(&mut proxies);
111
112 *last_fetched = Instant::now();
114 }
115 }
116
117 Ok(self.proxies.split_off(self.proxies.len() - amount))
118 }
119 }
120
121 fn request_builder(&self) -> ureq::Request {
122 let params = serde_urlencoded::to_string(&self.opts).unwrap_or_else(|_| {
123 panic!(
124 "Failed to serialize url, please raise an issue to address this: {}",
125 constants::REPO_URI
126 )
127 });
128 ureq::get(constants::API_URI).query_str(¶ms).build()
129 }
130
131 fn fetch(&self, request: &mut ureq::Request) -> Result<Vec<Proxy>, ApiError> {
132 if cfg!(not(test)) {
133 let resp = request.call();
134 let naive_resp = NaiveResponse::from(resp);
135
136 if naive_resp.ok() {
137 proxies_from_json(&naive_resp.text).map_err(|_| ApiError::from(naive_resp))
138 } else {
139 Err(ApiError::from(naive_resp))
140 }
141 } else {
142 use chrono::naive::NaiveDate;
143 use iso_country::Country;
144
145 use crate::{
146 proxy::Supports,
147 types::{Level, Protocol},
148 };
149
150 use std::{
151 iter,
152 net::{Ipv4Addr, SocketAddrV4},
153 time::Duration,
154 };
155
156 Ok(iter::repeat(Proxy {
160 socket: SocketAddrV4::new(Ipv4Addr::new(1, 2, 3, 4), 4321),
161 country: Country::CA,
162 last_checked: NaiveDate::from_ymd(2020, 1, 1).and_hms(1, 1, 1),
163 level: Level::Anonymous,
164 protocol: Protocol::Http,
165 time_to_connect: Duration::from_secs(21),
166 supports: Supports::default(),
167 })
168 .take(self.opts.limit as usize)
169 .collect())
170 }
171 }
172
173 pub fn drain(self) -> Vec<Proxy> {
175 self.proxies
176 }
177}
178
179impl Default for Fetcher {
180 fn default() -> Self {
181 Self::new(Opts::default())
182 }
183}
184
185#[cfg(test)]
186mod tests {
187 use serial_test::serial;
188
189 use super::*;
190
191 const FREE_LIMIT: usize = 5;
193 const PREMIUM_LIMIT: usize = 20;
194
195 mod functionality {
196 use iso_country::Country;
197
198 use super::*;
199 use crate::types::{Countries, Level};
200
201 #[test]
202 #[serial]
203 fn api_key() {
204 let mut fetcher = Fetcher::new(Opts::builder().api_key("<key>".to_string()).build());
205
206 let single = fetcher.try_get(1).unwrap();
207 let triple = fetcher.try_get(3).unwrap();
208 let the_rest = fetcher.drain();
209
210 assert_eq!(single.len(), 1);
211 assert_eq!(triple.len(), 3);
212 assert_eq!(PREMIUM_LIMIT, single.len() + triple.len() + the_rest.len());
213 }
214
215 #[test]
216 #[serial]
217 fn keyless() {
218 let mut fetcher = Fetcher::default();
219
220 let single = fetcher.try_get(1).unwrap();
221 let triple = fetcher.try_get(3).unwrap();
222 let the_rest = fetcher.drain();
223
224 assert_eq!(single.len(), 1);
225 assert_eq!(triple.len(), 3);
226 assert_eq!(FREE_LIMIT, single.len() + triple.len() + the_rest.len());
227 }
228
229 #[test]
230 #[serial]
231 fn multiple_requests() {
232 for i in 0..=2 * FREE_LIMIT {
234 let mut fetcher = Fetcher::default();
235 let proxies = fetcher.try_get(i).unwrap();
236 assert_eq!(proxies.len(), i);
237 }
238 }
239
240 #[test]
241 #[serial]
242 fn multiple_fetchers() {
243 let mut default = Fetcher::default();
245 let mut premium = Fetcher::new(Opts::builder().api_key("<key>".to_string()).build());
246 let mut custom = Fetcher::new(
247 Opts::builder()
248 .level(Level::Elite)
249 .cookies(true)
250 .countries(Countries::allow().country(Country::CA))
251 .build(),
252 );
253
254 let single = default.try_get(1).unwrap();
255 let double = premium.try_get(2).unwrap();
256 let triple = custom.try_get(3).unwrap();
257 assert_eq!(single.len(), 1);
258 assert_eq!(double.len(), 2);
259 assert_eq!(triple.len(), 3);
260 assert_eq!(default.drain().len(), FREE_LIMIT - single.len());
261 assert_eq!(premium.drain().len(), PREMIUM_LIMIT - double.len());
262 assert_eq!(custom.drain().len(), FREE_LIMIT - triple.len());
263 }
264 }
265
266 mod delays {
267 use std::time::Duration;
268
269 use super::*;
270
271 const TEN_MILLISEC: Duration = Duration::from_millis(10);
272
273 fn reset_last_fetched() {
275 let mut last_fetched = LAST_FETCHED.lock().unwrap();
276 *last_fetched = Instant::now() - constants::DELAY;
277 }
278
279 fn time_it<F, T>(f: F, (expected, delta): (Duration, Duration)) -> T
281 where
282 F: FnOnce() -> T,
283 {
284 let start = Instant::now();
285
286 let result = f();
287
288 let end = Instant::now();
289 let elapsed = end.duration_since(start);
290 eprintln!("Elapsed time: {:?}", elapsed);
291 eprintln!("Expected time: {:?} +/- {:?}", expected, delta);
292 assert!(elapsed >= (expected - delta), "Too fast");
293 assert!(elapsed <= (expected + delta), "Too slow");
294
295 result
296 }
297
298 #[test]
299 #[serial]
300 fn single_fetcher() {
301 let mut fetcher = time_it(
303 || {
304 reset_last_fetched();
305 let mut fetcher = Fetcher::default();
306 let _ = fetcher.try_get(FREE_LIMIT);
307 fetcher
308 },
309 (TEN_MILLISEC, TEN_MILLISEC),
311 );
312
313 let mut fetcher = time_it(
315 || {
316 let _ = fetcher.try_get(1);
317 fetcher
318 },
319 (constants::DELAY, TEN_MILLISEC),
321 );
322
323 time_it(
325 || {
326 let _ = fetcher.try_get(1);
327 assert!(!fetcher.drain().is_empty());
328 },
329 (TEN_MILLISEC, TEN_MILLISEC),
331 );
332 }
333
334 #[test]
335 #[serial]
336 fn multiple_delays() {
337 time_it(
339 || {
340 reset_last_fetched();
341
342 let mut keyless1 = Fetcher::default();
343 let mut keyless2 = Fetcher::default();
344 let mut premium =
346 Fetcher::new(Opts::builder().api_key("<key>".to_string()).build());
347
348 let _ = keyless1.try_get(2 * FREE_LIMIT);
349 let _ = premium.try_get(2 * PREMIUM_LIMIT);
351 let _ = keyless2.try_get(2 * FREE_LIMIT);
352 },
353 (3 * constants::DELAY, TEN_MILLISEC),
355 );
356 }
357
358 #[test]
359 #[serial]
360 fn multiple_fetchers() {
361 let (mut fetcher1, mut fetcher2) = time_it(
363 || {
364 reset_last_fetched();
365
366 let mut fetcher1 = Fetcher::default();
367 let mut fetcher2 = Fetcher::default();
368
369 let _ = fetcher1.try_get(1);
370 let _ = fetcher2.try_get(1);
371
372 (fetcher1, fetcher2)
373 },
374 (constants::DELAY, TEN_MILLISEC),
376 );
377
378 time_it(
380 || {
381 let _ = fetcher1.try_get(1);
382 let _ = fetcher2.try_get(1);
383 assert!(!fetcher1.drain().is_empty());
384 assert!(!fetcher2.drain().is_empty());
385 },
386 (TEN_MILLISEC, TEN_MILLISEC),
388 );
389 }
390
391 #[test]
392 #[serial]
393 fn multiple_threads() {
394 time_it(
396 || {
397 reset_last_fetched();
398
399 let mut fetcher1 = Fetcher::default();
400 let mut fetcher2 = Fetcher::default();
401
402 let handle1 = thread::spawn(move || {
403 let _ = fetcher1.try_get(1);
404 assert!(!fetcher1.drain().is_empty());
405 });
406 let handle2 = thread::spawn(move || {
407 let _ = fetcher2.try_get(1);
408 assert!(!fetcher2.drain().is_empty());
409 });
410
411 handle1.join().expect("Failed to join thread");
412 handle2.join().expect("Failed to join thread");
413 },
414 (constants::DELAY, TEN_MILLISEC),
416 );
417 }
418 }
419}