lead_oxide/
fetcher.rs

1//! [`Fetcher`][Fetcher] is the entrypoint to requesting proxies from the API.
2//!
3//! A `Fetcher` will return any proxies that match the provided [`Opts`][crate::opts], which by
4//! default will return any proxies on the listing.
5
6use std::{
7    sync::{Arc, Mutex},
8    thread,
9    time::Instant,
10};
11
12use crate::{
13    constants,
14    errors::ApiError,
15    opts::Opts,
16    proxy::{proxies_from_json, Proxy},
17    types::NaiveResponse,
18};
19
20lazy_static! {
21    static ref LAST_FETCHED: Arc<Mutex<Instant>> =
22        Arc::new(Mutex::new(Instant::now() - constants::DELAY));
23}
24
25/// The entrypoint into the API.
26///
27/// A `Fetcher` represents a set of filters for the specific types of
28/// proxies that it returns along with an internal list to ensure that the most proxies possible are
29/// being returned by the API to alleviate the rate-limiting and daily-limit.
30#[derive(Clone, Debug)]
31pub struct Fetcher {
32    opts: Opts,
33    proxies: Vec<Proxy>,
34}
35
36impl Fetcher {
37    /// Creates a new `Fetcher` with a given set of `Opts`. If you don't care about any attributes
38    /// of the returned proxies then `Fetcher::default` is a convenience method for
39    /// `Fetcher::new(Opts::default())`.
40    ///
41    /// ```
42    /// use lead_oxide::{opts::Opts, fetcher::Fetcher, types::{Level, Protocol}};
43    ///
44    /// let basic_fetcher = Fetcher::default();
45    /// let custom_fetcher = Fetcher::new(
46    ///     Opts::builder()
47    ///         .level(Level::Elite)
48    ///         .protocol(Protocol::Http)
49    ///         .https(true)
50    ///         .build()
51    /// );
52    /// ```
53    pub fn new(opts: Opts) -> Self {
54        Self {
55            opts,
56            proxies: Vec::new(),
57        }
58    }
59
60    /// Attempts to get the specified amount of proxies from the API, returning an error if there
61    /// was an issue with the API. Any proxies returned before an error was encountered will still
62    /// be stored in the interal buffer, so it's possible to use this to fully exhaust your daily
63    /// limit.
64    ///
65    /// ```no_run
66    /// use lead_oxide::fetcher::Fetcher;
67    ///
68    /// let mut fetcher = Fetcher::default();
69    /// // Exhaust the daily limit
70    /// assert!(fetcher.try_get(1_000).is_err());
71    /// // Drain the full list
72    /// let proxies = fetcher.drain();
73    /// ```
74    pub fn try_get(&mut self, amount: usize) -> Result<Vec<Proxy>, ApiError> {
75        if self.proxies.len() >= amount {
76            // If there's enough in the current list then just go ahead and fulfill without locking
77            Ok(self.proxies.split_off(self.proxies.len() - amount))
78        } else {
79            // Otherwise we need to lock and request the api
80            let mut request = self.request_builder();
81
82            if self.opts.is_premium() {
83                // Don't need to mess with any delays if we're using an api key. (This information
84                // was based off emailing the dev. I never got an api key to test)
85                while self.proxies.len() < amount {
86                    let mut proxies = self.fetch(&mut request)?;
87                    self.proxies.append(&mut proxies);
88                }
89            } else {
90                // If we don't have an api key then we need to coordinate delays to ensure we don't
91                // do more than one request per `constants::DELAY`
92                let mut last_fetched = match LAST_FETCHED.lock() {
93                    Ok(last_fetched) => last_fetched,
94                    Err(err) => {
95                        // If the lock was poisoned then play it safe and reset the timer
96                        let mut poisioned = err.into_inner();
97                        *poisioned = Instant::now();
98                        poisioned
99                    }
100                };
101
102                while self.proxies.len() < amount {
103                    // Delay to prevent rate limiting
104                    let delta = Instant::now().duration_since(*last_fetched);
105                    if delta < constants::DELAY {
106                        thread::sleep(constants::DELAY - delta);
107                    }
108
109                    let mut proxies = self.fetch(&mut request)?;
110                    self.proxies.append(&mut proxies);
111
112                    // Update the request time
113                    *last_fetched = Instant::now();
114                }
115            }
116
117            Ok(self.proxies.split_off(self.proxies.len() - amount))
118        }
119    }
120
121    fn request_builder(&self) -> ureq::Request {
122        let params = serde_urlencoded::to_string(&self.opts).unwrap_or_else(|_| {
123            panic!(
124                "Failed to serialize url, please raise an issue to address this: {}",
125                constants::REPO_URI
126            )
127        });
128        ureq::get(constants::API_URI).query_str(&params).build()
129    }
130
131    fn fetch(&self, request: &mut ureq::Request) -> Result<Vec<Proxy>, ApiError> {
132        if cfg!(not(test)) {
133            let resp = request.call();
134            let naive_resp = NaiveResponse::from(resp);
135
136            if naive_resp.ok() {
137                proxies_from_json(&naive_resp.text).map_err(|_| ApiError::from(naive_resp))
138            } else {
139                Err(ApiError::from(naive_resp))
140            }
141        } else {
142            use chrono::naive::NaiveDate;
143            use iso_country::Country;
144
145            use crate::{
146                proxy::Supports,
147                types::{Level, Protocol},
148            };
149
150            use std::{
151                iter,
152                net::{Ipv4Addr, SocketAddrV4},
153                time::Duration,
154            };
155
156            // TODO: is there a better way to mock the api response? It would be nice to test that
157            // errors get interpreted right too. And if we could panic then we can test that the
158            // mutex getting poisoned works right
159            Ok(iter::repeat(Proxy {
160                socket: SocketAddrV4::new(Ipv4Addr::new(1, 2, 3, 4), 4321),
161                country: Country::CA,
162                last_checked: NaiveDate::from_ymd(2020, 1, 1).and_hms(1, 1, 1),
163                level: Level::Anonymous,
164                protocol: Protocol::Http,
165                time_to_connect: Duration::from_secs(21),
166                supports: Supports::default(),
167            })
168            .take(self.opts.limit as usize)
169            .collect())
170        }
171    }
172
173    /// Consumes the `Fetcher` returning any proxies still left in the internal list.
174    pub fn drain(self) -> Vec<Proxy> {
175        self.proxies
176    }
177}
178
179impl Default for Fetcher {
180    fn default() -> Self {
181        Self::new(Opts::default())
182    }
183}
184
185#[cfg(test)]
186mod tests {
187    use serial_test::serial;
188
189    use super::*;
190
191    // XXX: need to handle clearing the env var key once that's added in
192    const FREE_LIMIT: usize = 5;
193    const PREMIUM_LIMIT: usize = 20;
194
195    mod functionality {
196        use iso_country::Country;
197
198        use super::*;
199        use crate::types::{Countries, Level};
200
201        #[test]
202        #[serial]
203        fn api_key() {
204            let mut fetcher = Fetcher::new(Opts::builder().api_key("<key>".to_string()).build());
205
206            let single = fetcher.try_get(1).unwrap();
207            let triple = fetcher.try_get(3).unwrap();
208            let the_rest = fetcher.drain();
209
210            assert_eq!(single.len(), 1);
211            assert_eq!(triple.len(), 3);
212            assert_eq!(PREMIUM_LIMIT, single.len() + triple.len() + the_rest.len());
213        }
214
215        #[test]
216        #[serial]
217        fn keyless() {
218            let mut fetcher = Fetcher::default();
219
220            let single = fetcher.try_get(1).unwrap();
221            let triple = fetcher.try_get(3).unwrap();
222            let the_rest = fetcher.drain();
223
224            assert_eq!(single.len(), 1);
225            assert_eq!(triple.len(), 3);
226            assert_eq!(FREE_LIMIT, single.len() + triple.len() + the_rest.len());
227        }
228
229        #[test]
230        #[serial]
231        fn multiple_requests() {
232            // Multiple requests can be done with a single method call
233            for i in 0..=2 * FREE_LIMIT {
234                let mut fetcher = Fetcher::default();
235                let proxies = fetcher.try_get(i).unwrap();
236                assert_eq!(proxies.len(), i);
237            }
238        }
239
240        #[test]
241        #[serial]
242        fn multiple_fetchers() {
243            // Each fetcher should be independent
244            let mut default = Fetcher::default();
245            let mut premium = Fetcher::new(Opts::builder().api_key("<key>".to_string()).build());
246            let mut custom = Fetcher::new(
247                Opts::builder()
248                    .level(Level::Elite)
249                    .cookies(true)
250                    .countries(Countries::allow().country(Country::CA))
251                    .build(),
252            );
253
254            let single = default.try_get(1).unwrap();
255            let double = premium.try_get(2).unwrap();
256            let triple = custom.try_get(3).unwrap();
257            assert_eq!(single.len(), 1);
258            assert_eq!(double.len(), 2);
259            assert_eq!(triple.len(), 3);
260            assert_eq!(default.drain().len(), FREE_LIMIT - single.len());
261            assert_eq!(premium.drain().len(), PREMIUM_LIMIT - double.len());
262            assert_eq!(custom.drain().len(), FREE_LIMIT - triple.len());
263        }
264    }
265
266    mod delays {
267        use std::time::Duration;
268
269        use super::*;
270
271        const TEN_MILLISEC: Duration = Duration::from_millis(10);
272
273        // TODO: do this with a fixture
274        fn reset_last_fetched() {
275            let mut last_fetched = LAST_FETCHED.lock().unwrap();
276            *last_fetched = Instant::now() - constants::DELAY;
277        }
278
279        // Helper function for ensuring runtime of a `FnOnce`
280        fn time_it<F, T>(f: F, (expected, delta): (Duration, Duration)) -> T
281        where
282            F: FnOnce() -> T,
283        {
284            let start = Instant::now();
285
286            let result = f();
287
288            let end = Instant::now();
289            let elapsed = end.duration_since(start);
290            eprintln!("Elapsed time: {:?}", elapsed);
291            eprintln!("Expected time: {:?} +/- {:?}", expected, delta);
292            assert!(elapsed >= (expected - delta), "Too fast");
293            assert!(elapsed <= (expected + delta), "Too slow");
294
295            result
296        }
297
298        #[test]
299        #[serial]
300        fn single_fetcher() {
301            // Requesting the first `FREE_LIMIT` is done in one call
302            let mut fetcher = time_it(
303                || {
304                    reset_last_fetched();
305                    let mut fetcher = Fetcher::default();
306                    let _ = fetcher.try_get(FREE_LIMIT);
307                    fetcher
308                },
309                // 10ms +/- 10ms
310                (TEN_MILLISEC, TEN_MILLISEC),
311            );
312
313            // any more will take another call
314            let mut fetcher = time_it(
315                || {
316                    let _ = fetcher.try_get(1);
317                    fetcher
318                },
319                // delay +/- 10ms
320                (constants::DELAY, TEN_MILLISEC),
321            );
322
323            // and since there are proxies in the internal list we can just use those
324            time_it(
325                || {
326                    let _ = fetcher.try_get(1);
327                    assert!(!fetcher.drain().is_empty());
328                },
329                // 10ms +/- 10ms
330                (TEN_MILLISEC, TEN_MILLISEC),
331            );
332        }
333
334        #[test]
335        #[serial]
336        fn multiple_delays() {
337            // Fulfilling 4 full requests should delay thrice
338            time_it(
339                || {
340                    reset_last_fetched();
341
342                    let mut keyless1 = Fetcher::default();
343                    let mut keyless2 = Fetcher::default();
344                    // TODO: this option is used several times. Reuse somehow?
345                    let mut premium =
346                        Fetcher::new(Opts::builder().api_key("<key>".to_string()).build());
347
348                    let _ = keyless1.try_get(2 * FREE_LIMIT);
349                    // Even while the keyless ones would be delayed, the premium is not
350                    let _ = premium.try_get(2 * PREMIUM_LIMIT);
351                    let _ = keyless2.try_get(2 * FREE_LIMIT);
352                },
353                // 3 * delay +/- 10ms
354                (3 * constants::DELAY, TEN_MILLISEC),
355            );
356        }
357
358        #[test]
359        #[serial]
360        fn multiple_fetchers() {
361            // Multiple fetchers should still have the delays coordinated
362            let (mut fetcher1, mut fetcher2) = time_it(
363                || {
364                    reset_last_fetched();
365
366                    let mut fetcher1 = Fetcher::default();
367                    let mut fetcher2 = Fetcher::default();
368
369                    let _ = fetcher1.try_get(1);
370                    let _ = fetcher2.try_get(1);
371
372                    (fetcher1, fetcher2)
373                },
374                // delay +/- 10ms
375                (constants::DELAY, TEN_MILLISEC),
376            );
377
378            // And each fetcher should now have an internal list to pull from with no delay
379            time_it(
380                || {
381                    let _ = fetcher1.try_get(1);
382                    let _ = fetcher2.try_get(1);
383                    assert!(!fetcher1.drain().is_empty());
384                    assert!(!fetcher2.drain().is_empty());
385                },
386                // 10ms +/- 10ms
387                (TEN_MILLISEC, TEN_MILLISEC),
388            );
389        }
390
391        #[test]
392        #[serial]
393        fn multiple_threads() {
394            // Multiple fetchers should still have the delays coordinated across threads
395            time_it(
396                || {
397                    reset_last_fetched();
398
399                    let mut fetcher1 = Fetcher::default();
400                    let mut fetcher2 = Fetcher::default();
401
402                    let handle1 = thread::spawn(move || {
403                        let _ = fetcher1.try_get(1);
404                        assert!(!fetcher1.drain().is_empty());
405                    });
406                    let handle2 = thread::spawn(move || {
407                        let _ = fetcher2.try_get(1);
408                        assert!(!fetcher2.drain().is_empty());
409                    });
410
411                    handle1.join().expect("Failed to join thread");
412                    handle2.join().expect("Failed to join thread");
413                },
414                // delay +/- 10ms
415                (constants::DELAY, TEN_MILLISEC),
416            );
417        }
418    }
419}