gr/
http.rs

1pub mod throttle;
2
3use crate::api_traits::ApiOperation;
4use crate::backoff::Backoff;
5use crate::cache::{Cache, CacheState};
6use crate::config::ConfigProperties;
7use crate::error::GRError;
8use crate::io::{
9    parse_page_headers, parse_ratelimit_headers, FlowControlHeaders, HttpResponse, HttpRunner,
10    RateLimitHeader, ResponseField,
11};
12use crate::time::{self, now_epoch_seconds, Seconds};
13use crate::{api_defaults, error, log_debug, log_error};
14use crate::{log_info, Result};
15use serde::{Deserialize, Serialize};
16use std::borrow::Borrow;
17use std::collections::{hash_map, HashMap};
18use std::iter::Iterator;
19use std::rc::Rc;
20use std::sync::{Arc, Mutex};
21use throttle::{ThrottleStrategy, ThrottleStrategyType};
22use ureq::Error;
23
24pub struct Client<C> {
25    cache: C,
26    config: Arc<dyn ConfigProperties>,
27    refresh_cache: bool,
28    time_to_ratelimit_reset: Mutex<Seconds>,
29    remaining_requests: Mutex<u32>,
30}
31
32// TODO: provide builder pattern for Client.
33impl<C> Client<C> {
34    pub fn new(cache: C, config: Arc<dyn ConfigProperties>, refresh_cache: bool) -> Self {
35        let remaining_requests = Mutex::new(api_defaults::DEFAULT_NUMBER_REQUESTS_MINUTE);
36        let time_to_ratelimit_reset = Mutex::new(now_epoch_seconds() + Seconds::new(60));
37        Client {
38            cache,
39            refresh_cache,
40            config,
41            time_to_ratelimit_reset,
42            remaining_requests,
43        }
44    }
45
46    fn submit<T: Serialize>(&self, request: &Request<T>) -> Result<HttpResponse> {
47        let ureq_req = match request.method {
48            Method::GET => ureq::get(request.url()),
49            Method::HEAD => ureq::head(request.url()),
50            Method::POST => ureq::post(request.url()),
51            Method::PATCH => ureq::patch(request.url()),
52            Method::PUT => ureq::put(request.url()),
53        };
54        let ureq_req = request
55            .headers()
56            .iter()
57            .fold(ureq_req, |req, (key, value)| req.set(key, value));
58        let call = || -> std::result::Result<ureq::Response, ureq::Error> {
59            match request.method {
60                Method::GET | Method::HEAD => ureq_req.call(),
61                _ => ureq_req.send_json(serde_json::to_value(request.body).unwrap()),
62            }
63        };
64        match call() {
65            Ok(response) | Err(Error::Status(_, response)) => {
66                let status = response.status().into();
67                // Grab headers for pagination and cache.
68                let headers =
69                    response
70                        .headers_names()
71                        .iter()
72                        .fold(Headers::new(), |mut headers, name| {
73                            headers.set(
74                                name.to_lowercase(),
75                                response.header(name.as_str()).unwrap().to_string(),
76                            );
77                            headers
78                        });
79                let rate_limit_header = Rc::new(parse_ratelimit_headers(Some(&headers)));
80                let page_header = Rc::new(parse_page_headers(Some(&headers)));
81                let flow_control_headers = FlowControlHeaders::new(page_header, rate_limit_header);
82                // log debug response headers
83                log_debug!("Response headers: {:?}", headers);
84                let body = response.into_string().unwrap_or_default();
85                let mut response = HttpResponse::builder()
86                    .status(status)
87                    .body(body)
88                    .headers(headers)
89                    .flow_control_headers(flow_control_headers)
90                    .build()
91                    .unwrap();
92                self.handle_rate_limit(&mut response)?;
93                Ok(response)
94            }
95            Err(err) => Err(GRError::HttpTransportError(err.to_string()).into()),
96        }
97    }
98}
99
100impl<C> Client<C> {
101    fn handle_rate_limit(&self, response: &mut HttpResponse) -> Result<()> {
102        if let Some(headers) = response.get_ratelimit_headers().borrow() {
103            if headers.remaining <= self.config.rate_limit_remaining_threshold() {
104                log_error!("Rate limit threshold reached");
105                return Err(error::GRError::RateLimitExceeded(*headers).into());
106            }
107            Ok(())
108        } else {
109            // The remote does not provide rate limit headers, so we apply our
110            // defaults for safety. Official github.com and gitlab.com do, so
111            // that could be an internal/dev, etc... instance setup without rate
112            // limits.
113            log_info!("Rate limit headers not provided by remote, using defaults");
114            default_rate_limit_handler(
115                response,
116                &self.config,
117                &self.time_to_ratelimit_reset,
118                &self.remaining_requests,
119                now_epoch_seconds,
120            )
121        }
122    }
123}
124
125fn default_rate_limit_handler(
126    response: &mut HttpResponse,
127    config: &Arc<dyn ConfigProperties>,
128    time_to_ratelimit_reset: &Mutex<Seconds>,
129    remaining_requests: &Mutex<u32>,
130    now_epoch_seconds: fn() -> Seconds,
131) -> Result<()> {
132    // bail if we are below the security threshold for remaining requests
133
134    if let Ok(remaining_requests) = remaining_requests.lock() {
135        if *remaining_requests <= config.rate_limit_remaining_threshold() {
136            let time_to_ratelimit_reset =
137                *time_to_ratelimit_reset.lock().unwrap() - now_epoch_seconds();
138            log_error!(
139                "Remote does not provide rate limit headers, \
140                            so the default rate limit of {} per minute has been \
141                            exceeded. Try again in {} seconds",
142                api_defaults::DEFAULT_NUMBER_REQUESTS_MINUTE,
143                time_to_ratelimit_reset
144            );
145            let rate_limit_header = RateLimitHeader::new(
146                *remaining_requests,
147                time_to_ratelimit_reset,
148                time_to_ratelimit_reset,
149            );
150            response.update_rate_limit_headers(rate_limit_header);
151            return Err(error::GRError::RateLimitExceeded(RateLimitHeader::new(
152                *remaining_requests,
153                time_to_ratelimit_reset,
154                time_to_ratelimit_reset,
155            ))
156            .into());
157        }
158    } else {
159        return Err(error::GRError::ApplicationError(
160            "http module rate limiting - Cannot read remaining \
161                    http requests"
162                .to_string(),
163        )
164        .into());
165    }
166
167    if let Ok(mut remaining_requests) = remaining_requests.lock() {
168        // if elapsed time is greater than 60 seconds, then reset
169        // remaining requests to default
170        let current_time = now_epoch_seconds();
171        let mut time_to_reset = time_to_ratelimit_reset.lock().unwrap();
172        if current_time > *time_to_reset {
173            *remaining_requests = api_defaults::DEFAULT_NUMBER_REQUESTS_MINUTE;
174            *time_to_reset = current_time + Seconds::new(60);
175        }
176        *remaining_requests -= 1;
177        // Using time to seconds relative as counter gets reset every minute.
178        log_info!(
179            "Remaining requests: {}, reset in: {} seconds",
180            *remaining_requests,
181            time::epoch_to_seconds_relative(*time_to_reset)
182        );
183    } else {
184        return Err(error::GRError::ApplicationError(
185            "http module rate limiting - Cannot decrease counter \
186                        number of requests pending"
187                .to_string(),
188        )
189        .into());
190    }
191
192    Ok(())
193}
194
195#[derive(Default)]
196pub struct Resource {
197    pub url: String,
198    pub api_operation: Option<ApiOperation>,
199}
200
201impl Resource {
202    pub fn new(url: &str, api_operation: Option<ApiOperation>) -> Self {
203        Resource {
204            url: url.to_string(),
205            api_operation,
206        }
207    }
208}
209
210#[derive(Serialize, Clone, Debug, Default)]
211pub struct Body<T>(HashMap<String, T>);
212
213impl<T> Body<T> {
214    pub fn new() -> Self {
215        Body(HashMap::new())
216    }
217
218    pub fn add<K: Into<String>>(&mut self, key: K, value: T) {
219        self.0.insert(key.into(), value);
220    }
221}
222
223#[derive(Clone, Debug, Default, Serialize, Deserialize)]
224pub struct Headers(HashMap<String, String>);
225
226impl Headers {
227    pub fn new() -> Self {
228        Headers(HashMap::new())
229    }
230
231    pub fn set<K: Into<String>, V: Into<String>>(&mut self, key: K, value: V) {
232        self.0.insert(key.into(), value.into());
233    }
234
235    pub fn get(&self, key: &str) -> Option<&String> {
236        self.0.get(key)
237    }
238
239    pub fn iter(&self) -> hash_map::Iter<String, String> {
240        self.0.iter()
241    }
242
243    pub fn extend(&mut self, headers: Headers) {
244        for (key, value) in headers.iter() {
245            self.0.insert(key.clone(), value.clone());
246        }
247    }
248}
249
250#[derive(Builder)]
251#[builder(pattern = "owned")]
252pub struct Request<'a, T> {
253    #[builder(setter(into, strip_option), default)]
254    pub body: Option<&'a Body<T>>,
255    #[builder(default)]
256    headers: Headers,
257    pub method: Method,
258    pub resource: Resource,
259    #[builder(setter(into, strip_option), default)]
260    pub max_pages: Option<i64>,
261}
262
263impl<'a, T> Request<'a, T> {
264    pub fn builder() -> RequestBuilder<'a, T> {
265        RequestBuilder::default()
266    }
267
268    pub fn new(url: &str, method: Method) -> Self {
269        Request {
270            body: None,
271            headers: Headers::new(),
272            method,
273            resource: Resource::new(url, None),
274            max_pages: None,
275        }
276    }
277
278    pub fn set_max_pages(&mut self, max_pages: i64) {
279        self.max_pages = Some(max_pages);
280    }
281
282    pub fn with_api_operation(mut self, api_operation: ApiOperation) -> Self {
283        self.resource.api_operation = Some(api_operation);
284        self
285    }
286
287    pub fn api_operation(&self) -> &Option<ApiOperation> {
288        &self.resource.api_operation
289    }
290
291    pub fn set_header(&mut self, key: &str, value: &str) {
292        self.headers.set(key.to_string(), value.to_string());
293    }
294
295    pub fn set_headers(&mut self, headers: Headers) {
296        self.headers = headers;
297    }
298
299    pub fn set_url(&mut self, url: &str) {
300        self.resource.url = url.to_string();
301    }
302
303    pub fn url(&self) -> &str {
304        &self.resource.url
305    }
306
307    pub fn headers(&self) -> &Headers {
308        &self.headers
309    }
310}
311
312#[derive(Clone, Debug, Default, Eq, PartialEq)]
313pub enum Method {
314    #[default]
315    HEAD,
316    GET,
317    POST,
318    PUT,
319    PATCH,
320}
321
322impl<C: Cache<Resource>> HttpRunner for Client<C> {
323    type Response = HttpResponse;
324
325    fn run<T: Serialize>(&self, cmd: &mut Request<T>) -> Result<Self::Response> {
326        match cmd.method {
327            Method::GET => {
328                let mut default_response = HttpResponse::builder().build().unwrap();
329                match self.cache.get(&cmd.resource) {
330                    Ok(CacheState::Fresh(mut response)) => {
331                        log_debug!("Cache fresh for {}", cmd.resource.url);
332                        if !self.refresh_cache {
333                            log_debug!("Returning local cached response");
334                            response.local_cache = true;
335                            return Ok(response);
336                        }
337                        default_response = response;
338                    }
339                    Ok(CacheState::Stale(response)) => {
340                        log_debug!("Cache stale for {}", cmd.resource.url);
341                        default_response = response;
342                    }
343                    Ok(CacheState::None) => {}
344                    Err(err) => return Err(err),
345                }
346                // check ETag is available in the default response.
347                // If so, then we need to set the If-None-Match header.
348                if let Some(etag) = default_response.get_etag() {
349                    cmd.set_header("If-None-Match", etag);
350                }
351                // If status is 304, then we need to return the cached response.
352                let response = self.submit(cmd)?;
353                if response.status == 304 {
354                    // Update cache with latest headers. This effectively
355                    // refreshes the cache and we won't hit this until per api
356                    // cache expiration as declared in the config.
357                    self.cache
358                        .update(&cmd.resource, &response, &ResponseField::Headers)?;
359                    return Ok(default_response);
360                }
361                self.cache.set(&cmd.resource, &response).unwrap();
362                Ok(response)
363            }
364            _ => Ok(self.submit(cmd)?),
365        }
366    }
367
368    fn api_max_pages<T: Serialize>(&self, cmd: &Request<T>) -> u32 {
369        let max_pages = self
370            .config
371            .get_max_pages(cmd.resource.api_operation.as_ref().unwrap());
372        max_pages
373    }
374}
375
376pub struct Paginator<'a, R, T> {
377    request: Request<'a, T>,
378    page_url: Option<String>,
379    iter: u32,
380    backoff: Backoff<'a, R>,
381    throttler: Box<dyn ThrottleStrategy>,
382    max_pages: u32,
383}
384
385impl<'a, R: HttpRunner, T: Serialize> Paginator<'a, R, T> {
386    pub fn new(
387        runner: &'a Arc<R>,
388        request: Request<'a, T>,
389        page_url: &str,
390        backoff: Backoff<'a, R>,
391        throttle_strategy: Box<dyn ThrottleStrategy>,
392    ) -> Self {
393        let max_pages = if let Some(max_pages) = request.max_pages {
394            max_pages as u32
395        } else {
396            runner.api_max_pages(&request)
397        };
398        Paginator {
399            request,
400            page_url: Some(page_url.to_string()),
401            iter: 0,
402            backoff,
403            throttler: throttle_strategy,
404            max_pages,
405        }
406    }
407}
408
409impl<T: Serialize, R: HttpRunner<Response = HttpResponse>> Iterator for Paginator<'_, R, T> {
410    type Item = Result<HttpResponse>;
411
412    fn next(&mut self) -> Option<Self::Item> {
413        if let Some(page_url) = &self.page_url {
414            if self.iter >= self.max_pages {
415                return None;
416            }
417            if self.iter >= 1 {
418                self.request.set_url(page_url);
419            }
420            log_info!("Requesting page: {}", self.iter + 1);
421            log_info!("URL: {}", self.request.url());
422
423            let response = match self.backoff.retry_on_error(&mut self.request) {
424                Ok(response) => {
425                    if let Some(page_headers) = response.get_page_headers().borrow() {
426                        match (page_headers.next_page(), page_headers.last_page()) {
427                            (Some(next), _) => self.page_url = Some(next.url().to_string()),
428                            (None, _) => self.page_url = None,
429                        }
430                    } else {
431                        self.page_url = None;
432                    };
433                    Ok(response)
434                }
435                Err(err) => {
436                    self.page_url = None;
437                    Err(err)
438                }
439            };
440            self.iter += 1;
441            if self.iter < self.max_pages && self.page_url.is_some() {
442                let response = response.as_ref().unwrap();
443                // Technically no need to check ok on response, as page_url is Some
444                // (response was Ok)
445                if !response.local_cache {
446                    if self.throttler.strategy() == ThrottleStrategyType::AutoRate {
447                        if self.iter >= api_defaults::ENGAGE_AUTORATE_THROTTLING_THRESHOLD {
448                            self.throttler
449                                .throttle(Some(response.get_flow_control_headers()));
450                        }
451                    } else {
452                        self.throttler
453                            .throttle(Some(response.get_flow_control_headers()));
454                    }
455                }
456            }
457            return Some(response);
458        }
459        None
460    }
461}
462
463#[cfg(test)]
464mod test {
465    use throttle::NoThrottle;
466
467    use super::*;
468
469    use crate::{
470        api_defaults::REST_API_MAX_PAGES,
471        backoff::Exponential,
472        cache,
473        io::{Page, PageHeader},
474        test::utils::{ConfigMock, MockRunner, MockThrottler},
475    };
476
477    fn header_processor_next_page_no_last() -> Rc<Option<PageHeader>> {
478        let mut page_header = PageHeader::new();
479        page_header.set_next_page(Page::new("http://localhost?page=2", 1));
480        Rc::new(Some(page_header))
481    }
482
483    fn header_processor_last_page_no_next() -> Rc<Option<PageHeader>> {
484        let mut page_header = PageHeader::new();
485        page_header.set_last_page(Page::new("http://localhost?page=2", 1));
486        Rc::new(Some(page_header))
487    }
488
489    fn response_with_next_page() -> HttpResponse {
490        let mut headers = Headers::new();
491        headers.set("link".to_string(), "http://localhost?page=2".to_string());
492        let flow_control_headers =
493            FlowControlHeaders::new(header_processor_next_page_no_last(), Rc::new(None));
494        let response = HttpResponse::builder()
495            .status(200)
496            .headers(headers)
497            .flow_control_headers(flow_control_headers)
498            .build()
499            .unwrap();
500        response
501    }
502
503    fn response_with_last_page() -> HttpResponse {
504        let mut headers = Headers::new();
505        headers.set("link".to_string(), "http://localhost?page=2".to_string());
506        let flow_control_headers =
507            FlowControlHeaders::new(header_processor_last_page_no_next(), Rc::new(None));
508        let response = HttpResponse::builder()
509            .status(200)
510            .headers(headers)
511            .flow_control_headers(flow_control_headers)
512            .build()
513            .unwrap();
514        response
515    }
516
517    fn cached_response_next_page() -> HttpResponse {
518        let mut headers = Headers::new();
519        headers.set("link".to_string(), "http://localhost?page=2".to_string());
520        let flow_control_headers =
521            FlowControlHeaders::new(header_processor_next_page_no_last(), Rc::new(None));
522        let response = HttpResponse::builder()
523            .status(200)
524            .headers(headers)
525            .flow_control_headers(flow_control_headers)
526            .local_cache(true)
527            .build()
528            .unwrap();
529        response
530    }
531
532    fn cached_response_last_page() -> HttpResponse {
533        let mut headers = Headers::new();
534        headers.set("link".to_string(), "http://localhost?page=2".to_string());
535        let flow_control_headers =
536            FlowControlHeaders::new(header_processor_last_page_no_next(), Rc::new(None));
537        let response = HttpResponse::builder()
538            .status(200)
539            .headers(headers)
540            .flow_control_headers(flow_control_headers)
541            .local_cache(true)
542            .build()
543            .unwrap();
544        response
545    }
546
547    #[test]
548    fn test_paginator_no_headers_no_next_no_last_pages() {
549        let response = HttpResponse::builder().status(200).build().unwrap();
550        let client = Arc::new(MockRunner::new(vec![response]));
551        let request: Request<()> = Request::new("http://localhost", Method::GET);
552        let throttler: Box<dyn ThrottleStrategy> = Box::new(NoThrottle::default());
553        let backoff = Backoff::new(
554            &client,
555            0,
556            60,
557            time::now_epoch_seconds,
558            Box::new(Exponential),
559            Box::new(throttle::DynamicFixed),
560        );
561        let paginator = Paginator::new(&client, request, "http://localhost", backoff, throttler);
562        let responses = paginator.collect::<Vec<Result<HttpResponse>>>();
563        assert_eq!(1, responses.len());
564        assert_eq!("http://localhost", *client.url());
565    }
566
567    #[test]
568    fn test_paginator_with_link_headers_one_next_and_no_last_pages() {
569        let response1 = response_with_next_page();
570        let mut headers = Headers::new();
571        headers.set("link".to_string(), "http://localhost?page=2".to_string());
572        let response2 = HttpResponse::builder()
573            .status(200)
574            .headers(headers)
575            .build()
576            .unwrap();
577        let client = Arc::new(MockRunner::new(vec![response2, response1]));
578        let request: Request<()> = Request::new("http://localhost", Method::GET);
579        let throttler: Box<dyn ThrottleStrategy> = Box::new(NoThrottle::default());
580        let backoff = Backoff::new(
581            &client,
582            0,
583            60,
584            time::now_epoch_seconds,
585            Box::new(Exponential),
586            Box::new(throttle::DynamicFixed),
587        );
588        let paginator = Paginator::new(&client, request, "http://localhost", backoff, throttler);
589        let responses = paginator.collect::<Vec<Result<HttpResponse>>>();
590        assert_eq!(2, responses.len());
591    }
592
593    #[test]
594    fn test_paginator_with_link_headers_one_next_and_one_last_pages() {
595        let response1 = response_with_next_page();
596        let response2 = response_with_last_page();
597        let client = Arc::new(MockRunner::new(vec![response2, response1]));
598        let request: Request<()> = Request::new("http://localhost", Method::GET);
599        let throttler: Box<dyn ThrottleStrategy> = Box::new(NoThrottle::default());
600        let backoff = Backoff::new(
601            &client,
602            0,
603            60,
604            time::now_epoch_seconds,
605            Box::new(Exponential),
606            Box::new(throttle::DynamicFixed),
607        );
608        let paginator = Paginator::new(&client, request, "http://localhost", backoff, throttler);
609        let responses = paginator.collect::<Vec<Result<HttpResponse>>>();
610        assert_eq!(2, responses.len());
611    }
612
613    #[test]
614    fn test_paginator_error_response() {
615        let response = HttpResponse::builder()
616            .status(500)
617            .body("Internal Server Error".to_string())
618            .build()
619            .unwrap();
620        let client = Arc::new(MockRunner::new(vec![response]));
621        let request: Request<()> = Request::new("http://localhost", Method::GET);
622        let throttler: Box<dyn ThrottleStrategy> = Box::new(NoThrottle::default());
623        let backoff = Backoff::new(
624            &client,
625            0,
626            60,
627            time::now_epoch_seconds,
628            Box::new(Exponential),
629            Box::new(throttle::DynamicFixed),
630        );
631        let paginator = Paginator::new(&client, request, "http://localhost", backoff, throttler);
632        let responses = paginator.collect::<Vec<Result<HttpResponse>>>();
633        assert_eq!(1, responses.len());
634        assert_eq!("http://localhost", *client.url());
635    }
636
637    #[test]
638    fn test_client_get_api_max_pages() {
639        let config = Arc::new(ConfigMock::new(1));
640        let runner = Client::new(cache::NoCache, config, false);
641        let cmd: Request<()> =
642            Request::new("http://localhost", Method::GET).with_api_operation(ApiOperation::Project);
643        assert_eq!(1, runner.api_max_pages(&cmd))
644    }
645
646    #[test]
647    fn test_paginator_stops_paging_after_api_max_pages_reached() {
648        let response1 = response_with_next_page();
649        let response2 = response_with_next_page();
650        let response3 = response_with_last_page();
651        // setup client with max pages set to 1
652        let client = Arc::new(
653            MockRunner::new(vec![response3, response2, response1]).with_config(ConfigMock::new(1)),
654        );
655        let request: Request<()> = Request::new("http://localhost", Method::GET);
656        let throttler: Box<dyn ThrottleStrategy> = Box::new(NoThrottle::default());
657        let backoff = Backoff::new(
658            &client,
659            0,
660            60,
661            time::now_epoch_seconds,
662            Box::new(Exponential),
663            Box::new(throttle::DynamicFixed),
664        );
665        let paginator = Paginator::new(&client, request, "http://localhost", backoff, throttler);
666        let responses = paginator.collect::<Vec<Result<HttpResponse>>>();
667        assert_eq!(1, responses.len());
668    }
669
670    #[test]
671    fn test_paginator_limits_to_max_pages_default() {
672        let api_max_pages = REST_API_MAX_PAGES + 5;
673        let mut responses = Vec::new();
674        for _ in 0..api_max_pages {
675            let response = response_with_next_page();
676            responses.push(response);
677        }
678        let last_response = response_with_last_page();
679        responses.push(last_response);
680        responses.reverse();
681        let request: Request<()> = Request::new("http://localhost", Method::GET);
682        let client = Arc::new(MockRunner::new(responses));
683        let throttler: Box<dyn ThrottleStrategy> = Box::new(NoThrottle::default());
684        let backoff = Backoff::new(
685            &client,
686            0,
687            60,
688            time::now_epoch_seconds,
689            Box::new(Exponential),
690            Box::new(throttle::DynamicFixed),
691        );
692        let paginator = Paginator::new(&client, request, "http://localhost", backoff, throttler);
693        let responses = paginator.collect::<Vec<Result<HttpResponse>>>();
694        assert_eq!(REST_API_MAX_PAGES, responses.len() as u32);
695    }
696
697    #[test]
698    fn test_ratelimit_remaining_threshold_reached_is_error() {
699        let mut headers = Headers::new();
700        headers.set("x-ratelimit-remaining".to_string(), "10".to_string());
701        let flow_control_headers = FlowControlHeaders::new(
702            Rc::new(None),
703            Rc::new(Some(RateLimitHeader::new(
704                10,
705                Seconds::new(60),
706                Seconds::new(60),
707            ))),
708        );
709        let mut response = HttpResponse::builder()
710            .status(200)
711            .headers(headers)
712            .flow_control_headers(flow_control_headers)
713            .build()
714            .unwrap();
715        let client = Client::new(cache::NoCache, Arc::new(ConfigMock::new(1)), false);
716        assert!(client.handle_rate_limit(&mut response).is_err());
717    }
718
719    #[test]
720    fn test_ratelimit_remaining_threshold_not_reached_is_ok() {
721        let mut headers = Headers::new();
722        headers.set("ratelimit-remaining".to_string(), "11".to_string());
723        let mut response = HttpResponse::builder()
724            .status(200)
725            .headers(headers)
726            .build()
727            .unwrap();
728        let client = Client::new(cache::NoCache, Arc::new(ConfigMock::new(1)), false);
729        assert!(client.handle_rate_limit(&mut response).is_ok());
730    }
731
732    fn epoch_seconds_now_mock(secs: u64) -> Seconds {
733        Seconds::new(secs)
734    }
735
736    #[test]
737    fn test_remaining_requests_below_threshold_all_fail() {
738        // remaining requests - below threshold of 10 (api_defaults)
739        let remaining_requests = Arc::new(Mutex::new(4));
740        // 10 seconds before we reset counter to 80 (api_defaults)
741        let time_to_ratelimit_reset = Arc::new(Mutex::new(Seconds::new(10)));
742        let now = || -> Seconds { epoch_seconds_now_mock(1) };
743        let config: Arc<dyn ConfigProperties> = Arc::new(ConfigMock::new(1));
744
745        // counter will never get reset - all requests will fail
746        let mut threads = Vec::new();
747        for _ in 0..10 {
748            let remaining_requests = remaining_requests.clone();
749            let time_to_ratelimit_reset = time_to_ratelimit_reset.clone();
750            let config = config.clone();
751            threads.push(std::thread::spawn(move || {
752                let mut response = HttpResponse::builder().status(200).build().unwrap();
753                let result = default_rate_limit_handler(
754                    &mut response,
755                    &config,
756                    &time_to_ratelimit_reset,
757                    &remaining_requests,
758                    now,
759                );
760                assert!(result.is_err());
761            }));
762        }
763        for thread in threads {
764            thread.join().unwrap();
765        }
766    }
767
768    #[test]
769    fn test_time_to_reset_achieved_resets_counter_all_ok() {
770        // one remaining request before we hit threshold
771        let remaining_requests = Arc::new(Mutex::new(11));
772        let time_to_ratelimit_reset = Arc::new(Mutex::new(Seconds::new(1)));
773        // now > time_to_ratelimit_reset - counter will be reset to 80
774        let now = || -> Seconds { epoch_seconds_now_mock(2) };
775        let config: Arc<dyn ConfigProperties> = Arc::new(ConfigMock::new(1));
776
777        let mut threads = Vec::new();
778        // 70 parallel requests - remaining 11.
779        // On first request, will reset total number to 81, then decrease by 1.\
780        // having 80 left to process with a threshold of 10, then the remaining
781        // 69 will be processed. Time to reset will be set to 62
782        // If we had 71, then the last one would fail.
783        for _ in 0..70 {
784            let remaining_requests = remaining_requests.clone();
785            let time_to_ratelimit_reset = time_to_ratelimit_reset.clone();
786            let config = config.clone();
787            threads.push(std::thread::spawn(move || {
788                let mut response = HttpResponse::builder().status(200).build().unwrap();
789                let result = default_rate_limit_handler(
790                    &mut response,
791                    &config,
792                    &time_to_ratelimit_reset,
793                    &remaining_requests,
794                    now,
795                );
796                assert!(result.is_ok());
797            }));
798        }
799        for thread in threads {
800            thread.join().unwrap();
801        }
802    }
803
804    #[test]
805    fn test_paginator_stops_paging_after_http_request_max_pages_reached() {
806        let response1 = response_with_next_page();
807        let response2 = response_with_next_page();
808        let response3 = response_with_last_page();
809        let client = Arc::new(MockRunner::new(vec![response3, response2, response1]));
810        let request: Request<()> = Request::builder()
811            .method(Method::GET)
812            .resource(Resource::new("http://localhost", None))
813            .max_pages(1)
814            .build()
815            .unwrap();
816        let throttler: Box<dyn ThrottleStrategy> = Box::new(NoThrottle::default());
817        let backoff = Backoff::new(
818            &client,
819            0,
820            60,
821            time::now_epoch_seconds,
822            Box::new(Exponential),
823            Box::new(throttle::DynamicFixed),
824        );
825        let paginator = Paginator::new(&client, request, "http://localhost", backoff, throttler);
826        let responses = paginator.collect::<Vec<Result<HttpResponse>>>();
827        assert_eq!(1, responses.len());
828    }
829
830    #[test]
831    fn test_paginator_fixed_throttle_enabled() {
832        let response1 = response_with_next_page();
833        let response2 = response_with_next_page();
834        let response3 = response_with_last_page();
835        let client = Arc::new(MockRunner::new(vec![response3, response2, response1]));
836        let request: Request<()> = Request::new("http://localhost", Method::GET);
837        let throttler = Rc::new(MockThrottler::new(None));
838        let bthrottler: Box<dyn ThrottleStrategy> = Box::new(Rc::clone(&throttler));
839        let backoff = Backoff::new(
840            &client,
841            0,
842            60,
843            time::now_epoch_seconds,
844            Box::new(Exponential),
845            Box::new(throttle::DynamicFixed),
846        );
847        let paginator = Paginator::new(&client, request, "http://localhost", backoff, bthrottler);
848        let responses = paginator.collect::<Vec<Result<HttpResponse>>>();
849        assert_eq!(3, responses.len());
850        assert_eq!(2, *throttler.throttled());
851    }
852
853    #[test]
854    fn test_paginator_range_throttle_enabled() {
855        let response1 = response_with_next_page();
856        let response2 = response_with_last_page();
857        let client = Arc::new(MockRunner::new(vec![response2, response1]));
858        let request: Request<()> = Request::new("http://localhost", Method::GET);
859        let throttler = Rc::new(MockThrottler::new(None));
860        let bthrottler: Box<dyn ThrottleStrategy> = Box::new(Rc::clone(&throttler));
861        let backoff = Backoff::new(
862            &client,
863            0,
864            60,
865            time::now_epoch_seconds,
866            Box::new(Exponential),
867            Box::new(throttle::DynamicFixed),
868        );
869        let paginator = Paginator::new(&client, request, "http://localhost", backoff, bthrottler);
870        let responses = paginator.collect::<Vec<Result<HttpResponse>>>();
871        assert_eq!(2, responses.len());
872        assert_eq!(1, *throttler.throttled());
873    }
874
875    #[test]
876    fn test_paginator_no_throttle_if_response_is_from_local_cache() {
877        let response1 = cached_response_next_page();
878        let response2 = cached_response_next_page();
879        let response3 = cached_response_last_page();
880        let client = Arc::new(MockRunner::new(vec![response3, response2, response1]));
881        let request: Request<()> = Request::new("http://localhost", Method::GET);
882        let throttler = Rc::new(MockThrottler::new(None));
883        let bthrottler: Box<dyn ThrottleStrategy> = Box::new(Rc::clone(&throttler));
884        let backoff = Backoff::new(
885            &client,
886            0,
887            60,
888            time::now_epoch_seconds,
889            Box::new(Exponential),
890            Box::new(throttle::DynamicFixed),
891        );
892        let paginator = Paginator::new(&client, request, "http://localhost", backoff, bthrottler);
893        let responses = paginator.collect::<Vec<Result<HttpResponse>>>();
894        assert_eq!(3, responses.len());
895        assert_eq!(0, *throttler.throttled());
896    }
897
898    #[test]
899    fn test_user_request_from_up_to_pages_takes_over_max_api_pages() {
900        let mut responses = Vec::new();
901        for _ in 0..4 {
902            let response = response_with_next_page();
903            responses.push(response);
904        }
905        let last_response = response_with_last_page();
906        responses.push(last_response);
907        responses.reverse();
908        // config api max pages 2
909        let client = Arc::new(MockRunner::new(responses).with_config(ConfigMock::new(2)));
910        let request: Request<()> = Request::builder()
911            .method(Method::GET)
912            .resource(Resource::new("http://localhost", None))
913            // User requests 5 pages
914            .max_pages(5)
915            .build()
916            .unwrap();
917        let backoff = Backoff::new(
918            &client,
919            0,
920            60,
921            time::now_epoch_seconds,
922            Box::new(Exponential),
923            Box::new(throttle::DynamicFixed),
924        );
925        let throttler: Box<dyn ThrottleStrategy> = Box::new(NoThrottle::default());
926        let paginator = Paginator::new(&client, request, "http://localhost", backoff, throttler);
927        let responses = paginator.collect::<Vec<Result<HttpResponse>>>();
928        assert_eq!(5, responses.len());
929    }
930
931    #[test]
932    fn test_paginator_auto_throttle_enabled_after_autorate_engage_threshold() {
933        let response1 = response_with_next_page();
934        let response2 = response_with_next_page();
935        let response3 = response_with_next_page();
936        // Throttles in next two requests
937        let response4 = response_with_next_page();
938        let response5 = response_with_last_page();
939        let client = Arc::new(MockRunner::new(vec![
940            response5, response4, response3, response2, response1,
941        ]));
942        let request: Request<()> = Request::builder()
943            .method(Method::GET)
944            .resource(Resource::new("http://localhost", None))
945            .max_pages(5)
946            .build()
947            .unwrap();
948        let throttler = Rc::new(MockThrottler::new(Some(
949            throttle::ThrottleStrategyType::AutoRate,
950        )));
951        let bthrottler: Box<dyn ThrottleStrategy> = Box::new(Rc::clone(&throttler));
952        let backoff = Backoff::new(
953            &client,
954            0,
955            60,
956            time::now_epoch_seconds,
957            Box::new(Exponential),
958            Box::new(throttle::DynamicFixed),
959        );
960        let paginator = Paginator::new(&client, request, "http://localhost", backoff, bthrottler);
961        let responses = paginator.collect::<Vec<Result<HttpResponse>>>();
962        assert_eq!(5, responses.len());
963        assert_eq!(2, *throttler.throttled());
964    }
965}