1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
use std::{
    sync::{Arc, Mutex},
    thread,
    time::Instant,
};

use crate::{constants, errors::ApiError, opts::Opts, types::Proxy};

#[derive(Clone, Debug)]
pub struct Fetcher {
    last_fetched: Arc<Mutex<Instant>>,
    // TODO: store the url so we don't have to keep rebuilding it? It won't change while the fetcher
    // is running
    opts: Opts,
    proxies: Vec<Proxy>,
}

impl Fetcher {
    #[must_use]
    pub fn oneshot() -> Self {
        Self::oneshot_with_opts(Opts::default())
    }

    #[must_use]
    pub fn oneshot_with_opts(opts: Opts) -> Self {
        Session::new().fetcher_with_opts(opts)
    }

    #[must_use]
    fn new(last_fetched: Arc<Mutex<Instant>>, opts: Opts) -> Self {
        Self {
            last_fetched,
            opts,
            proxies: Vec::new(),
        }
    }

    pub fn try_get(&mut self, amount: usize) -> Result<Vec<Proxy>, ApiError> {
        if self.proxies.len() >= amount {
            // If there's enough in the current list then just go ahead and fulfill without locking
            Ok(self.proxies.split_off(self.proxies.len() - amount))
        } else {
            // Otherwise we need to lock and request the api
            let mut request = self.request_builder();

            let mut last_fetched = match self.last_fetched.lock() {
                Ok(last_fetched) => last_fetched,
                Err(err) => {
                    // If the lock was poisoned then play it safe and reset the timer
                    let mut poisioned = err.into_inner();
                    *poisioned = Instant::now();
                    poisioned
                }
            };

            while self.proxies.len() < amount {
                // Delay to prevent rate limiting
                let delta = Instant::now().duration_since(*last_fetched);
                if delta < constants::DELAY {
                    thread::sleep(constants::DELAY - delta);
                }

                let mut proxies = self.fetch(&mut request)?;
                self.proxies.append(&mut proxies);

                // Update the request time
                *last_fetched = Instant::now();
            }

            Ok(self.proxies.split_off(self.proxies.len() - amount))
        }
    }

    #[must_use]
    fn request_builder(&self) -> ureq::Request {
        let params = serde_urlencoded::to_string(&self.opts).unwrap_or_else(|_| {
            panic!(
                "Failed to serialize url, please raise an issue to address this: {}",
                constants::REPO_URI
            )
        });
        ureq::get(constants::API_URI).query_str(&params).build()
    }

    fn fetch(&self, request: &mut ureq::Request) -> Result<Vec<Proxy>, ApiError> {
        if cfg!(not(test)) {
            let resp = request.call();

            if resp.ok() {
                let resp_str = resp
                    .into_string()
                    .expect("Failed converting response to string");

                match serde_json::from_str::<crate::types::Response>(&resp_str) {
                    Ok(response) => Ok(response.data),
                    Err(_) => Err(ApiError::from(resp_str)),
                }
            } else {
                Err(ApiError::from(resp))
            }
        } else {
            use iso_country::Country;

            // TODO: is there a better way to mock the api response? It would be nice to test that
            // errors get interpreted right too. And if we could panic then we can test that the
            // mutex getting poisoned works right
            Ok(std::iter::repeat(Proxy {
                ip: std::net::Ipv4Addr::new(1, 2, 3, 4),
                port: 4321,
                country: Country::CA,
                last_checked: chrono::naive::NaiveDate::from_ymd(2020, 1, 1).and_hms(1, 1, 1),
                level: crate::types::Level::Anonymous,
                protocol: crate::types::Protocol::Http,
                time_to_connect: 21,
                supports: crate::types::Supports::default(),
            })
            .take(self.opts.limit as usize)
            .collect())
        }
    }

    #[must_use]
    pub fn drain(self) -> Vec<Proxy> {
        self.proxies
    }
}

#[derive(Debug)]
pub struct Session {
    last_fetched: Arc<Mutex<Instant>>,
}

impl Session {
    #[must_use]
    pub fn new() -> Self {
        Self::default()
    }

    #[must_use]
    pub fn fetcher(&self) -> Fetcher {
        self.fetcher_with_opts(Opts::default())
    }

    #[must_use]
    pub fn fetcher_with_opts(&self, opts: Opts) -> Fetcher {
        Fetcher::new(self.last_fetched.clone(), opts)
    }
}

impl Default for Session {
    fn default() -> Self {
        Self {
            // Start far enough back to avoid delay
            last_fetched: Arc::new(Mutex::new(Instant::now() - constants::DELAY)),
        }
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    // XXX: need to handle clearing the env var key once that's added in
    const FREE_LIMIT: usize = 5;
    const PREMIUM_LIMIT: usize = 20;

    mod functionality {
        use super::*;
        use crate::types::{Countries, Level};

        use iso_country::Country;

        #[test]
        fn api_key() {
            let mut fetcher =
                Fetcher::oneshot_with_opts(Opts::builder().api_key("<key>").try_build().unwrap());

            let single = fetcher.try_get(1).unwrap();
            let triple = fetcher.try_get(3).unwrap();
            let the_rest = fetcher.drain();

            assert_eq!(single.len(), 1);
            assert_eq!(triple.len(), 3);
            assert_eq!(PREMIUM_LIMIT, single.len() + triple.len() + the_rest.len());
        }

        #[test]
        fn keyless() {
            let mut fetcher = Fetcher::oneshot();

            let single = fetcher.try_get(1).unwrap();
            let triple = fetcher.try_get(3).unwrap();
            let the_rest = fetcher.drain();

            assert_eq!(single.len(), 1);
            assert_eq!(triple.len(), 3);
            assert_eq!(FREE_LIMIT, single.len() + triple.len() + the_rest.len());
        }

        #[test]
        fn multiple_requests() {
            // Multiple requests can be done with a single method call
            for i in 0..=2 * FREE_LIMIT {
                let mut fetcher = Fetcher::oneshot();
                let proxies = fetcher.try_get(i).unwrap();
                assert_eq!(proxies.len(), i);
            }
        }

        #[test]
        fn multiple_fetchers() {
            // Each fetcher should be independent
            let session = Session::new();
            let mut default = session.fetcher();
            let mut premium =
                session.fetcher_with_opts(Opts::builder().api_key("<key>").try_build().unwrap());
            let mut custom = session.fetcher_with_opts(
                Opts::builder()
                    .level(Level::Elite)
                    .cookies(true)
                    .countries(Countries::allowlist(&[Country::CA]))
                    .try_build()
                    .unwrap(),
            );

            let single = default.try_get(1).unwrap();
            let double = premium.try_get(2).unwrap();
            let triple = custom.try_get(3).unwrap();
            assert_eq!(single.len(), 1);
            assert_eq!(double.len(), 2);
            assert_eq!(triple.len(), 3);
            assert_eq!(default.drain().len(), FREE_LIMIT - single.len());
            assert_eq!(premium.drain().len(), PREMIUM_LIMIT - double.len());
            assert_eq!(custom.drain().len(), FREE_LIMIT - triple.len());
        }
    }

    mod delays {
        use super::*;

        use std::time::Duration;

        const TEN_MILLISEC: Duration = Duration::from_millis(10);

        // Helper function for ensuring runtime of a `FnOnce`
        fn time_it<F, T>(f: F, (expected, delta): (Duration, Duration)) -> T
        where
            F: FnOnce() -> T,
        {
            let start = Instant::now();

            let result = f();

            let end = Instant::now();
            let elapsed = end.duration_since(start);
            eprintln!("Elapsed time: {:?}", elapsed);
            eprintln!("Expected time: {:?} +/- {:?}", expected, delta);
            assert!(elapsed >= (expected - delta), "Too fast");
            assert!(elapsed <= (expected + delta), "Too slow");

            result
        }

        #[test]
        fn single_fetcher() {
            // Requesting the first `FREE_LIMIT` is done in one call
            let mut fetcher = time_it(
                || {
                    let mut fetcher = Fetcher::oneshot();
                    let _ = fetcher.try_get(FREE_LIMIT);
                    fetcher
                },
                // 10ms +/- 10ms
                (TEN_MILLISEC, TEN_MILLISEC),
            );

            // any more will take another call
            let mut fetcher = time_it(
                || {
                    let _ = fetcher.try_get(1);
                    fetcher
                },
                // delay +/- 10ms
                (constants::DELAY, TEN_MILLISEC),
            );

            // and since there are proxies in the internal list we can just use those
            time_it(
                || {
                    let _ = fetcher.try_get(1);
                    assert!(!fetcher.drain().is_empty());
                },
                // 10ms +/- 10ms
                (TEN_MILLISEC, TEN_MILLISEC),
            );
        }

        #[test]
        fn multiple_delays() {
            // Fulfilling 4 full requests should delay thrice
            time_it(
                || {
                    let session = Session::new();
                    let mut keyless = session.fetcher();
                    // TODO: this option is used several times. Reuse somehow?
                    let mut premium = session
                        .fetcher_with_opts(Opts::builder().api_key("<key>").try_build().unwrap());

                    let _ = keyless.try_get(2 * FREE_LIMIT);
                    let _ = premium.try_get(2 * PREMIUM_LIMIT);
                },
                // 3 * delay +/- 10ms
                (3 * constants::DELAY, TEN_MILLISEC),
            );
        }

        #[test]
        fn multiple_fetchers() {
            // Multiple fetchers should still have the delays coordinated
            let (mut fetcher1, mut fetcher2) = time_it(
                || {
                    let session = Session::new();
                    let mut fetcher1 = session.fetcher();
                    let mut fetcher2 = session.fetcher();

                    let _ = fetcher1.try_get(1);
                    let _ = fetcher2.try_get(1);

                    (fetcher1, fetcher2)
                },
                // delay +/- 10ms
                (constants::DELAY, TEN_MILLISEC),
            );

            // And each fetcher should now have an internal list to pull from with no delay
            time_it(
                || {
                    let _ = fetcher1.try_get(1);
                    let _ = fetcher2.try_get(1);
                    assert!(!fetcher1.drain().is_empty());
                    assert!(!fetcher2.drain().is_empty());
                },
                // 10ms +/- 10ms
                (TEN_MILLISEC, TEN_MILLISEC),
            );
        }

        #[test]
        fn mutliple_threads() {
            // Multiple fetchers should still have the delays coordinated across threads
            time_it(
                || {
                    let session = Session::new();
                    let mut fetcher1 = session.fetcher();
                    let mut fetcher2 = session.fetcher();

                    let handle1 = thread::spawn(move || {
                        let _ = fetcher1.try_get(1);
                        assert!(!fetcher1.drain().is_empty());
                    });
                    let handle2 = thread::spawn(move || {
                        let _ = fetcher2.try_get(1);
                        assert!(!fetcher2.drain().is_empty());
                    });

                    handle1.join().expect("Failed to join thread");
                    handle2.join().expect("Failed to join thread");
                },
                // delay +/- 10ms
                (constants::DELAY, TEN_MILLISEC),
            );
        }
    }
}