gr/
http.rs

1pub mod throttle;
2
3use crate::api_traits::ApiOperation;
4use crate::backoff::Backoff;
5use crate::cache::{Cache, CacheState};
6use crate::config::ConfigProperties;
7use crate::error::GRError;
8use crate::io::{
9    parse_page_headers, parse_ratelimit_headers, FlowControlHeaders, HttpResponse, HttpRunner,
10    RateLimitHeader, ResponseField,
11};
12use crate::time::{self, now_epoch_seconds, Seconds};
13use crate::{api_defaults, error, log_debug, log_error};
14use crate::{log_info, Result};
15use serde::{Deserialize, Serialize};
16use std::borrow::Borrow;
17use std::collections::{hash_map, HashMap};
18use std::iter::Iterator;
19use std::rc::Rc;
20use std::sync::{Arc, Mutex};
21use throttle::{ThrottleStrategy, ThrottleStrategyType};
22
23pub struct Client<C> {
24    cache: C,
25    config: Arc<dyn ConfigProperties>,
26    refresh_cache: bool,
27    time_to_ratelimit_reset: Mutex<Seconds>,
28    remaining_requests: Mutex<u32>,
29    http_agent: ureq::Agent,
30}
31
32// TODO: provide builder pattern for Client.
33impl<C> Client<C> {
34    pub fn new(cache: C, config: Arc<dyn ConfigProperties>, refresh_cache: bool) -> Self {
35        let remaining_requests = Mutex::new(api_defaults::DEFAULT_NUMBER_REQUESTS_MINUTE);
36        let time_to_ratelimit_reset = Mutex::new(now_epoch_seconds() + Seconds::new(60));
37        let http_config = ureq::Agent::config_builder()
38            // Keeps same functionality as the ureq 2.x default.
39            // that is we handle the response as normal when error codes such as
40            // 4xx, 5xx are returned.
41            .http_status_as_error(false)
42            .build();
43        Client {
44            cache,
45            refresh_cache,
46            config,
47            time_to_ratelimit_reset,
48            remaining_requests,
49            http_agent: http_config.into(),
50        }
51    }
52
53    fn submit<T: Serialize>(&self, request: &Request<T>) -> Result<HttpResponse> {
54        let response = match request.method {
55            Method::GET => {
56                let req = self.http_agent.get(request.url());
57                let req = Self::add_headers(req, request.headers());
58                req.call()
59            }
60            Method::HEAD => {
61                let req = self.http_agent.head(request.url());
62                let req = Self::add_headers(req, request.headers());
63                req.call()
64            }
65            Method::POST => {
66                let req = self.http_agent.post(request.url());
67                let req = Self::add_headers(req, request.headers());
68                req.send_json(serde_json::to_value(request.body).unwrap())
69            }
70            Method::PATCH => {
71                let req = self.http_agent.patch(request.url());
72                let req = Self::add_headers(req, request.headers());
73                req.send_json(serde_json::to_value(request.body).unwrap())
74            }
75            Method::PUT => {
76                let req = self.http_agent.put(request.url());
77                let req = Self::add_headers(req, request.headers());
78                req.send_json(serde_json::to_value(request.body).unwrap())
79            }
80        };
81
82        match response {
83            Ok(response) => {
84                let status = response.status();
85                // Grab headers for pagination and cache.
86                let headers =
87                    response
88                        .headers()
89                        .iter()
90                        .fold(Headers::new(), |mut headers, (name, value)| {
91                            headers.set::<String, String>(
92                                name.to_string(),
93                                value.to_str().unwrap().to_string(),
94                            );
95                            headers
96                        });
97                let rate_limit_header = Rc::new(parse_ratelimit_headers(Some(&headers)));
98                let page_header = Rc::new(parse_page_headers(Some(&headers)));
99                let flow_control_headers = FlowControlHeaders::new(page_header, rate_limit_header);
100                // log debug response headers
101                log_debug!("Response headers: {:?}", headers);
102                let mut body = response.into_body();
103                let mut response = HttpResponse::builder()
104                    .status(status.as_u16() as i32)
105                    .headers(headers)
106                    .body(body.read_to_string().unwrap_or_default())
107                    .flow_control_headers(flow_control_headers)
108                    .build()
109                    .unwrap();
110
111                self.handle_rate_limit(&mut response)?;
112                Ok(response)
113            }
114            Err(err) => Err(GRError::HttpTransportError(err.to_string()).into()),
115        }
116    }
117
118    fn add_headers<B>(
119        mut builder: ureq::RequestBuilder<B>,
120        headers: &Headers,
121    ) -> ureq::RequestBuilder<B> {
122        for (key, value) in headers.iter() {
123            builder = builder.header(key, value);
124        }
125        builder
126    }
127}
128
129impl<C> Client<C> {
130    fn handle_rate_limit(&self, response: &mut HttpResponse) -> Result<()> {
131        if let Some(headers) = response.get_ratelimit_headers().borrow() {
132            if headers.remaining <= self.config.rate_limit_remaining_threshold() {
133                log_error!("Rate limit threshold reached");
134                return Err(error::GRError::RateLimitExceeded(*headers).into());
135            }
136            Ok(())
137        } else {
138            // The remote does not provide rate limit headers, so we apply our
139            // defaults for safety. Official github.com and gitlab.com do, so
140            // that could be an internal/dev, etc... instance setup without rate
141            // limits.
142            log_info!("Rate limit headers not provided by remote, using defaults");
143            default_rate_limit_handler(
144                response,
145                &self.config,
146                &self.time_to_ratelimit_reset,
147                &self.remaining_requests,
148                now_epoch_seconds,
149            )
150        }
151    }
152}
153
154fn default_rate_limit_handler(
155    response: &mut HttpResponse,
156    config: &Arc<dyn ConfigProperties>,
157    time_to_ratelimit_reset: &Mutex<Seconds>,
158    remaining_requests: &Mutex<u32>,
159    now_epoch_seconds: fn() -> Seconds,
160) -> Result<()> {
161    // bail if we are below the security threshold for remaining requests
162
163    if let Ok(remaining_requests) = remaining_requests.lock() {
164        if *remaining_requests <= config.rate_limit_remaining_threshold() {
165            let time_to_ratelimit_reset =
166                *time_to_ratelimit_reset.lock().unwrap() - now_epoch_seconds();
167            log_error!(
168                "Remote does not provide rate limit headers, \
169                            so the default rate limit of {} per minute has been \
170                            exceeded. Try again in {} seconds",
171                api_defaults::DEFAULT_NUMBER_REQUESTS_MINUTE,
172                time_to_ratelimit_reset
173            );
174            let rate_limit_header = RateLimitHeader::new(
175                *remaining_requests,
176                time_to_ratelimit_reset,
177                time_to_ratelimit_reset,
178            );
179            response.update_rate_limit_headers(rate_limit_header);
180            return Err(error::GRError::RateLimitExceeded(RateLimitHeader::new(
181                *remaining_requests,
182                time_to_ratelimit_reset,
183                time_to_ratelimit_reset,
184            ))
185            .into());
186        }
187    } else {
188        return Err(error::GRError::ApplicationError(
189            "http module rate limiting - Cannot read remaining \
190                    http requests"
191                .to_string(),
192        )
193        .into());
194    }
195
196    if let Ok(mut remaining_requests) = remaining_requests.lock() {
197        // if elapsed time is greater than 60 seconds, then reset
198        // remaining requests to default
199        let current_time = now_epoch_seconds();
200        let mut time_to_reset = time_to_ratelimit_reset.lock().unwrap();
201        if current_time > *time_to_reset {
202            *remaining_requests = api_defaults::DEFAULT_NUMBER_REQUESTS_MINUTE;
203            *time_to_reset = current_time + Seconds::new(60);
204        }
205        *remaining_requests -= 1;
206        // Using time to seconds relative as counter gets reset every minute.
207        log_info!(
208            "Remaining requests: {}, reset in: {} seconds",
209            *remaining_requests,
210            time::epoch_to_seconds_relative(*time_to_reset)
211        );
212    } else {
213        return Err(error::GRError::ApplicationError(
214            "http module rate limiting - Cannot decrease counter \
215                        number of requests pending"
216                .to_string(),
217        )
218        .into());
219    }
220
221    Ok(())
222}
223
224#[derive(Default)]
225pub struct Resource {
226    pub url: String,
227    pub api_operation: Option<ApiOperation>,
228}
229
230impl Resource {
231    pub fn new(url: &str, api_operation: Option<ApiOperation>) -> Self {
232        Resource {
233            url: url.to_string(),
234            api_operation,
235        }
236    }
237}
238
239#[derive(Serialize, Clone, Debug, Default)]
240pub struct Body<T>(HashMap<String, T>);
241
242impl<T> Body<T> {
243    pub fn new() -> Self {
244        Body(HashMap::new())
245    }
246
247    pub fn add<K: Into<String>>(&mut self, key: K, value: T) {
248        self.0.insert(key.into(), value);
249    }
250}
251
252#[derive(Clone, Debug, Default, Serialize, Deserialize)]
253pub struct Headers(HashMap<String, String>);
254
255impl Headers {
256    pub fn new() -> Self {
257        Headers(HashMap::new())
258    }
259
260    pub fn set<K: Into<String>, V: Into<String>>(&mut self, key: K, value: V) {
261        self.0.insert(key.into(), value.into());
262    }
263
264    pub fn get(&self, key: &str) -> Option<&String> {
265        self.0.get(key)
266    }
267
268    pub fn iter(&self) -> hash_map::Iter<'_, String, String> {
269        self.0.iter()
270    }
271
272    pub fn extend(&mut self, headers: Headers) {
273        for (key, value) in headers.iter() {
274            self.0.insert(key.clone(), value.clone());
275        }
276    }
277}
278
279#[derive(Builder)]
280#[builder(pattern = "owned")]
281pub struct Request<'a, T> {
282    #[builder(setter(into, strip_option), default)]
283    pub body: Option<&'a Body<T>>,
284    #[builder(default)]
285    headers: Headers,
286    pub method: Method,
287    pub resource: Resource,
288    #[builder(setter(into, strip_option), default)]
289    pub max_pages: Option<i64>,
290}
291
292impl<'a, T> Request<'a, T> {
293    pub fn builder() -> RequestBuilder<'a, T> {
294        RequestBuilder::default()
295    }
296
297    pub fn new(url: &str, method: Method) -> Self {
298        Request {
299            body: None,
300            headers: Headers::new(),
301            method,
302            resource: Resource::new(url, None),
303            max_pages: None,
304        }
305    }
306
307    pub fn set_max_pages(&mut self, max_pages: i64) {
308        self.max_pages = Some(max_pages);
309    }
310
311    pub fn with_api_operation(mut self, api_operation: ApiOperation) -> Self {
312        self.resource.api_operation = Some(api_operation);
313        self
314    }
315
316    pub fn api_operation(&self) -> &Option<ApiOperation> {
317        &self.resource.api_operation
318    }
319
320    pub fn set_header(&mut self, key: &str, value: &str) {
321        self.headers.set(key.to_string(), value.to_string());
322    }
323
324    pub fn set_headers(&mut self, headers: Headers) {
325        self.headers = headers;
326    }
327
328    pub fn set_url(&mut self, url: &str) {
329        self.resource.url = url.to_string();
330    }
331
332    pub fn url(&self) -> &str {
333        &self.resource.url
334    }
335
336    pub fn headers(&self) -> &Headers {
337        &self.headers
338    }
339}
340
341#[derive(Clone, Debug, Default, Eq, PartialEq)]
342pub enum Method {
343    #[default]
344    HEAD,
345    GET,
346    POST,
347    PUT,
348    PATCH,
349}
350
351impl<C: Cache<Resource>> HttpRunner for Client<C> {
352    type Response = HttpResponse;
353
354    fn run<T: Serialize>(&self, cmd: &mut Request<T>) -> Result<Self::Response> {
355        match cmd.method {
356            Method::GET => {
357                let mut default_response = HttpResponse::builder().build().unwrap();
358                match self.cache.get(&cmd.resource) {
359                    Ok(CacheState::Fresh(mut response)) => {
360                        log_debug!("Cache fresh for {}", cmd.resource.url);
361                        if !self.refresh_cache {
362                            log_debug!("Returning local cached response");
363                            response.local_cache = true;
364                            return Ok(response);
365                        }
366                        default_response = response;
367                    }
368                    Ok(CacheState::Stale(response)) => {
369                        log_debug!("Cache stale for {}", cmd.resource.url);
370                        default_response = response;
371                    }
372                    Ok(CacheState::None) => {}
373                    Err(err) => return Err(err),
374                }
375                // check ETag is available in the default response.
376                // If so, then we need to set the If-None-Match header.
377                if let Some(etag) = default_response.get_etag() {
378                    cmd.set_header("If-None-Match", etag);
379                }
380                // If status is 304, then we need to return the cached response.
381                let response = self.submit(cmd)?;
382                if response.status == 304 {
383                    // Update cache with latest headers. This effectively
384                    // refreshes the cache and we won't hit this until per api
385                    // cache expiration as declared in the config.
386                    self.cache
387                        .update(&cmd.resource, &response, &ResponseField::Headers)?;
388                    return Ok(default_response);
389                }
390                self.cache.set(&cmd.resource, &response).unwrap();
391                Ok(response)
392            }
393            _ => Ok(self.submit(cmd)?),
394        }
395    }
396
397    fn api_max_pages<T: Serialize>(&self, cmd: &Request<T>) -> u32 {
398        let max_pages = self
399            .config
400            .get_max_pages(cmd.resource.api_operation.as_ref().unwrap());
401        max_pages
402    }
403}
404
405pub struct Paginator<'a, R, T> {
406    request: Request<'a, T>,
407    page_url: Option<String>,
408    iter: u32,
409    backoff: Backoff<'a, R>,
410    throttler: Box<dyn ThrottleStrategy>,
411    max_pages: u32,
412}
413
414impl<'a, R: HttpRunner, T: Serialize> Paginator<'a, R, T> {
415    pub fn new(
416        runner: &'a Arc<R>,
417        request: Request<'a, T>,
418        page_url: &str,
419        backoff: Backoff<'a, R>,
420        throttle_strategy: Box<dyn ThrottleStrategy>,
421    ) -> Self {
422        let max_pages = if let Some(max_pages) = request.max_pages {
423            max_pages as u32
424        } else {
425            runner.api_max_pages(&request)
426        };
427        Paginator {
428            request,
429            page_url: Some(page_url.to_string()),
430            iter: 0,
431            backoff,
432            throttler: throttle_strategy,
433            max_pages,
434        }
435    }
436}
437
438impl<T: Serialize, R: HttpRunner<Response = HttpResponse>> Iterator for Paginator<'_, R, T> {
439    type Item = Result<HttpResponse>;
440
441    fn next(&mut self) -> Option<Self::Item> {
442        if let Some(page_url) = &self.page_url {
443            if self.iter >= self.max_pages {
444                return None;
445            }
446            if self.iter >= 1 {
447                self.request.set_url(page_url);
448            }
449            log_info!("Requesting page: {}", self.iter + 1);
450            log_info!("URL: {}", self.request.url());
451
452            let response = match self.backoff.retry_on_error(&mut self.request) {
453                Ok(response) => {
454                    if let Some(page_headers) = response.get_page_headers().borrow() {
455                        match (page_headers.next_page(), page_headers.last_page()) {
456                            (Some(next), _) => self.page_url = Some(next.url().to_string()),
457                            (None, _) => self.page_url = None,
458                        }
459                    } else {
460                        self.page_url = None;
461                    };
462                    Ok(response)
463                }
464                Err(err) => {
465                    self.page_url = None;
466                    Err(err)
467                }
468            };
469            self.iter += 1;
470            if self.iter < self.max_pages && self.page_url.is_some() {
471                let response = response.as_ref().unwrap();
472                // Technically no need to check ok on response, as page_url is Some
473                // (response was Ok)
474                if !response.local_cache {
475                    if self.throttler.strategy() == ThrottleStrategyType::AutoRate {
476                        if self.iter >= api_defaults::ENGAGE_AUTORATE_THROTTLING_THRESHOLD {
477                            self.throttler
478                                .throttle(Some(response.get_flow_control_headers()));
479                        }
480                    } else {
481                        self.throttler
482                            .throttle(Some(response.get_flow_control_headers()));
483                    }
484                }
485            }
486            return Some(response);
487        }
488        None
489    }
490}
491
492#[cfg(test)]
493mod test {
494    use throttle::NoThrottle;
495
496    use super::*;
497
498    use crate::{
499        api_defaults::REST_API_MAX_PAGES,
500        backoff::Exponential,
501        cache,
502        io::{Page, PageHeader},
503        test::utils::{ConfigMock, MockRunner, MockThrottler},
504    };
505
506    fn header_processor_next_page_no_last() -> Rc<Option<PageHeader>> {
507        let mut page_header = PageHeader::new();
508        page_header.set_next_page(Page::new("http://localhost?page=2", 1));
509        Rc::new(Some(page_header))
510    }
511
512    fn header_processor_last_page_no_next() -> Rc<Option<PageHeader>> {
513        let mut page_header = PageHeader::new();
514        page_header.set_last_page(Page::new("http://localhost?page=2", 1));
515        Rc::new(Some(page_header))
516    }
517
518    fn response_with_next_page() -> HttpResponse {
519        let mut headers = Headers::new();
520        headers.set("link".to_string(), "http://localhost?page=2".to_string());
521        let flow_control_headers =
522            FlowControlHeaders::new(header_processor_next_page_no_last(), Rc::new(None));
523        let response = HttpResponse::builder()
524            .status(200)
525            .headers(headers)
526            .flow_control_headers(flow_control_headers)
527            .build()
528            .unwrap();
529        response
530    }
531
532    fn response_with_last_page() -> HttpResponse {
533        let mut headers = Headers::new();
534        headers.set("link".to_string(), "http://localhost?page=2".to_string());
535        let flow_control_headers =
536            FlowControlHeaders::new(header_processor_last_page_no_next(), Rc::new(None));
537        let response = HttpResponse::builder()
538            .status(200)
539            .headers(headers)
540            .flow_control_headers(flow_control_headers)
541            .build()
542            .unwrap();
543        response
544    }
545
546    fn cached_response_next_page() -> HttpResponse {
547        let mut headers = Headers::new();
548        headers.set("link".to_string(), "http://localhost?page=2".to_string());
549        let flow_control_headers =
550            FlowControlHeaders::new(header_processor_next_page_no_last(), Rc::new(None));
551        let response = HttpResponse::builder()
552            .status(200)
553            .headers(headers)
554            .flow_control_headers(flow_control_headers)
555            .local_cache(true)
556            .build()
557            .unwrap();
558        response
559    }
560
561    fn cached_response_last_page() -> HttpResponse {
562        let mut headers = Headers::new();
563        headers.set("link".to_string(), "http://localhost?page=2".to_string());
564        let flow_control_headers =
565            FlowControlHeaders::new(header_processor_last_page_no_next(), Rc::new(None));
566        let response = HttpResponse::builder()
567            .status(200)
568            .headers(headers)
569            .flow_control_headers(flow_control_headers)
570            .local_cache(true)
571            .build()
572            .unwrap();
573        response
574    }
575
576    #[test]
577    fn test_paginator_no_headers_no_next_no_last_pages() {
578        let response = HttpResponse::builder().status(200).build().unwrap();
579        let client = Arc::new(MockRunner::new(vec![response]));
580        let request: Request<()> = Request::new("http://localhost", Method::GET);
581        let throttler: Box<dyn ThrottleStrategy> = Box::new(NoThrottle);
582        let backoff = Backoff::new(
583            &client,
584            0,
585            60,
586            time::now_epoch_seconds,
587            Box::new(Exponential),
588            Box::new(throttle::DynamicFixed),
589        );
590        let paginator = Paginator::new(&client, request, "http://localhost", backoff, throttler);
591        let responses = paginator.collect::<Vec<Result<HttpResponse>>>();
592        assert_eq!(1, responses.len());
593        assert_eq!("http://localhost", *client.url());
594    }
595
596    #[test]
597    fn test_paginator_with_link_headers_one_next_and_no_last_pages() {
598        let response1 = response_with_next_page();
599        let mut headers = Headers::new();
600        headers.set("link".to_string(), "http://localhost?page=2".to_string());
601        let response2 = HttpResponse::builder()
602            .status(200)
603            .headers(headers)
604            .build()
605            .unwrap();
606        let client = Arc::new(MockRunner::new(vec![response2, response1]));
607        let request: Request<()> = Request::new("http://localhost", Method::GET);
608        let throttler: Box<dyn ThrottleStrategy> = Box::new(NoThrottle);
609        let backoff = Backoff::new(
610            &client,
611            0,
612            60,
613            time::now_epoch_seconds,
614            Box::new(Exponential),
615            Box::new(throttle::DynamicFixed),
616        );
617        let paginator = Paginator::new(&client, request, "http://localhost", backoff, throttler);
618        let responses = paginator.collect::<Vec<Result<HttpResponse>>>();
619        assert_eq!(2, responses.len());
620    }
621
622    #[test]
623    fn test_paginator_with_link_headers_one_next_and_one_last_pages() {
624        let response1 = response_with_next_page();
625        let response2 = response_with_last_page();
626        let client = Arc::new(MockRunner::new(vec![response2, response1]));
627        let request: Request<()> = Request::new("http://localhost", Method::GET);
628        let throttler: Box<dyn ThrottleStrategy> = Box::new(NoThrottle);
629        let backoff = Backoff::new(
630            &client,
631            0,
632            60,
633            time::now_epoch_seconds,
634            Box::new(Exponential),
635            Box::new(throttle::DynamicFixed),
636        );
637        let paginator = Paginator::new(&client, request, "http://localhost", backoff, throttler);
638        let responses = paginator.collect::<Vec<Result<HttpResponse>>>();
639        assert_eq!(2, responses.len());
640    }
641
642    #[test]
643    fn test_paginator_error_response() {
644        let response = HttpResponse::builder()
645            .status(500)
646            .body("Internal Server Error".to_string())
647            .build()
648            .unwrap();
649        let client = Arc::new(MockRunner::new(vec![response]));
650        let request: Request<()> = Request::new("http://localhost", Method::GET);
651        let throttler: Box<dyn ThrottleStrategy> = Box::new(NoThrottle);
652        let backoff = Backoff::new(
653            &client,
654            0,
655            60,
656            time::now_epoch_seconds,
657            Box::new(Exponential),
658            Box::new(throttle::DynamicFixed),
659        );
660        let paginator = Paginator::new(&client, request, "http://localhost", backoff, throttler);
661        let responses = paginator.collect::<Vec<Result<HttpResponse>>>();
662        assert_eq!(1, responses.len());
663        assert_eq!("http://localhost", *client.url());
664    }
665
666    #[test]
667    fn test_client_get_api_max_pages() {
668        let config = Arc::new(ConfigMock::new(1));
669        let runner = Client::new(cache::NoCache, config, false);
670        let cmd: Request<()> =
671            Request::new("http://localhost", Method::GET).with_api_operation(ApiOperation::Project);
672        assert_eq!(1, runner.api_max_pages(&cmd))
673    }
674
675    #[test]
676    fn test_paginator_stops_paging_after_api_max_pages_reached() {
677        let response1 = response_with_next_page();
678        let response2 = response_with_next_page();
679        let response3 = response_with_last_page();
680        // setup client with max pages set to 1
681        let client = Arc::new(
682            MockRunner::new(vec![response3, response2, response1]).with_config(ConfigMock::new(1)),
683        );
684        let request: Request<()> = Request::new("http://localhost", Method::GET);
685        let throttler: Box<dyn ThrottleStrategy> = Box::new(NoThrottle);
686        let backoff = Backoff::new(
687            &client,
688            0,
689            60,
690            time::now_epoch_seconds,
691            Box::new(Exponential),
692            Box::new(throttle::DynamicFixed),
693        );
694        let paginator = Paginator::new(&client, request, "http://localhost", backoff, throttler);
695        let responses = paginator.collect::<Vec<Result<HttpResponse>>>();
696        assert_eq!(1, responses.len());
697    }
698
699    #[test]
700    fn test_paginator_limits_to_max_pages_default() {
701        let api_max_pages = REST_API_MAX_PAGES + 5;
702        let mut responses = Vec::new();
703        for _ in 0..api_max_pages {
704            let response = response_with_next_page();
705            responses.push(response);
706        }
707        let last_response = response_with_last_page();
708        responses.push(last_response);
709        responses.reverse();
710        let request: Request<()> = Request::new("http://localhost", Method::GET);
711        let client = Arc::new(MockRunner::new(responses));
712        let throttler: Box<dyn ThrottleStrategy> = Box::new(NoThrottle);
713        let backoff = Backoff::new(
714            &client,
715            0,
716            60,
717            time::now_epoch_seconds,
718            Box::new(Exponential),
719            Box::new(throttle::DynamicFixed),
720        );
721        let paginator = Paginator::new(&client, request, "http://localhost", backoff, throttler);
722        let responses = paginator.collect::<Vec<Result<HttpResponse>>>();
723        assert_eq!(REST_API_MAX_PAGES, responses.len() as u32);
724    }
725
726    #[test]
727    fn test_ratelimit_remaining_threshold_reached_is_error() {
728        let mut headers = Headers::new();
729        headers.set("x-ratelimit-remaining".to_string(), "10".to_string());
730        let flow_control_headers = FlowControlHeaders::new(
731            Rc::new(None),
732            Rc::new(Some(RateLimitHeader::new(
733                10,
734                Seconds::new(60),
735                Seconds::new(60),
736            ))),
737        );
738        let mut response = HttpResponse::builder()
739            .status(200)
740            .headers(headers)
741            .flow_control_headers(flow_control_headers)
742            .build()
743            .unwrap();
744        let client = Client::new(cache::NoCache, Arc::new(ConfigMock::new(1)), false);
745        assert!(client.handle_rate_limit(&mut response).is_err());
746    }
747
748    #[test]
749    fn test_ratelimit_remaining_threshold_not_reached_is_ok() {
750        let mut headers = Headers::new();
751        headers.set("ratelimit-remaining".to_string(), "11".to_string());
752        let mut response = HttpResponse::builder()
753            .status(200)
754            .headers(headers)
755            .build()
756            .unwrap();
757        let client = Client::new(cache::NoCache, Arc::new(ConfigMock::new(1)), false);
758        assert!(client.handle_rate_limit(&mut response).is_ok());
759    }
760
761    fn epoch_seconds_now_mock(secs: u64) -> Seconds {
762        Seconds::new(secs)
763    }
764
765    #[test]
766    fn test_remaining_requests_below_threshold_all_fail() {
767        // remaining requests - below threshold of 10 (api_defaults)
768        let remaining_requests = Arc::new(Mutex::new(4));
769        // 10 seconds before we reset counter to 80 (api_defaults)
770        let time_to_ratelimit_reset = Arc::new(Mutex::new(Seconds::new(10)));
771        let now = || -> Seconds { epoch_seconds_now_mock(1) };
772        let config: Arc<dyn ConfigProperties> = Arc::new(ConfigMock::new(1));
773
774        // counter will never get reset - all requests will fail
775        let mut threads = Vec::new();
776        for _ in 0..10 {
777            let remaining_requests = remaining_requests.clone();
778            let time_to_ratelimit_reset = time_to_ratelimit_reset.clone();
779            let config = config.clone();
780            threads.push(std::thread::spawn(move || {
781                let mut response = HttpResponse::builder().status(200).build().unwrap();
782                let result = default_rate_limit_handler(
783                    &mut response,
784                    &config,
785                    &time_to_ratelimit_reset,
786                    &remaining_requests,
787                    now,
788                );
789                assert!(result.is_err());
790            }));
791        }
792        for thread in threads {
793            thread.join().unwrap();
794        }
795    }
796
797    #[test]
798    fn test_time_to_reset_achieved_resets_counter_all_ok() {
799        // one remaining request before we hit threshold
800        let remaining_requests = Arc::new(Mutex::new(11));
801        let time_to_ratelimit_reset = Arc::new(Mutex::new(Seconds::new(1)));
802        // now > time_to_ratelimit_reset - counter will be reset to 80
803        let now = || -> Seconds { epoch_seconds_now_mock(2) };
804        let config: Arc<dyn ConfigProperties> = Arc::new(ConfigMock::new(1));
805
806        let mut threads = Vec::new();
807        // 70 parallel requests - remaining 11.
808        // On first request, will reset total number to 81, then decrease by 1.\
809        // having 80 left to process with a threshold of 10, then the remaining
810        // 69 will be processed. Time to reset will be set to 62
811        // If we had 71, then the last one would fail.
812        for _ in 0..70 {
813            let remaining_requests = remaining_requests.clone();
814            let time_to_ratelimit_reset = time_to_ratelimit_reset.clone();
815            let config = config.clone();
816            threads.push(std::thread::spawn(move || {
817                let mut response = HttpResponse::builder().status(200).build().unwrap();
818                let result = default_rate_limit_handler(
819                    &mut response,
820                    &config,
821                    &time_to_ratelimit_reset,
822                    &remaining_requests,
823                    now,
824                );
825                assert!(result.is_ok());
826            }));
827        }
828        for thread in threads {
829            thread.join().unwrap();
830        }
831    }
832
833    #[test]
834    fn test_paginator_stops_paging_after_http_request_max_pages_reached() {
835        let response1 = response_with_next_page();
836        let response2 = response_with_next_page();
837        let response3 = response_with_last_page();
838        let client = Arc::new(MockRunner::new(vec![response3, response2, response1]));
839        let request: Request<()> = Request::builder()
840            .method(Method::GET)
841            .resource(Resource::new("http://localhost", None))
842            .max_pages(1)
843            .build()
844            .unwrap();
845        let throttler: Box<dyn ThrottleStrategy> = Box::new(NoThrottle);
846        let backoff = Backoff::new(
847            &client,
848            0,
849            60,
850            time::now_epoch_seconds,
851            Box::new(Exponential),
852            Box::new(throttle::DynamicFixed),
853        );
854        let paginator = Paginator::new(&client, request, "http://localhost", backoff, throttler);
855        let responses = paginator.collect::<Vec<Result<HttpResponse>>>();
856        assert_eq!(1, responses.len());
857    }
858
859    #[test]
860    fn test_paginator_fixed_throttle_enabled() {
861        let response1 = response_with_next_page();
862        let response2 = response_with_next_page();
863        let response3 = response_with_last_page();
864        let client = Arc::new(MockRunner::new(vec![response3, response2, response1]));
865        let request: Request<()> = Request::new("http://localhost", Method::GET);
866        let throttler = Rc::new(MockThrottler::new(None));
867        let bthrottler: Box<dyn ThrottleStrategy> = Box::new(Rc::clone(&throttler));
868        let backoff = Backoff::new(
869            &client,
870            0,
871            60,
872            time::now_epoch_seconds,
873            Box::new(Exponential),
874            Box::new(throttle::DynamicFixed),
875        );
876        let paginator = Paginator::new(&client, request, "http://localhost", backoff, bthrottler);
877        let responses = paginator.collect::<Vec<Result<HttpResponse>>>();
878        assert_eq!(3, responses.len());
879        assert_eq!(2, *throttler.throttled());
880    }
881
882    #[test]
883    fn test_paginator_range_throttle_enabled() {
884        let response1 = response_with_next_page();
885        let response2 = response_with_last_page();
886        let client = Arc::new(MockRunner::new(vec![response2, response1]));
887        let request: Request<()> = Request::new("http://localhost", Method::GET);
888        let throttler = Rc::new(MockThrottler::new(None));
889        let bthrottler: Box<dyn ThrottleStrategy> = Box::new(Rc::clone(&throttler));
890        let backoff = Backoff::new(
891            &client,
892            0,
893            60,
894            time::now_epoch_seconds,
895            Box::new(Exponential),
896            Box::new(throttle::DynamicFixed),
897        );
898        let paginator = Paginator::new(&client, request, "http://localhost", backoff, bthrottler);
899        let responses = paginator.collect::<Vec<Result<HttpResponse>>>();
900        assert_eq!(2, responses.len());
901        assert_eq!(1, *throttler.throttled());
902    }
903
904    #[test]
905    fn test_paginator_no_throttle_if_response_is_from_local_cache() {
906        let response1 = cached_response_next_page();
907        let response2 = cached_response_next_page();
908        let response3 = cached_response_last_page();
909        let client = Arc::new(MockRunner::new(vec![response3, response2, response1]));
910        let request: Request<()> = Request::new("http://localhost", Method::GET);
911        let throttler = Rc::new(MockThrottler::new(None));
912        let bthrottler: Box<dyn ThrottleStrategy> = Box::new(Rc::clone(&throttler));
913        let backoff = Backoff::new(
914            &client,
915            0,
916            60,
917            time::now_epoch_seconds,
918            Box::new(Exponential),
919            Box::new(throttle::DynamicFixed),
920        );
921        let paginator = Paginator::new(&client, request, "http://localhost", backoff, bthrottler);
922        let responses = paginator.collect::<Vec<Result<HttpResponse>>>();
923        assert_eq!(3, responses.len());
924        assert_eq!(0, *throttler.throttled());
925    }
926
927    #[test]
928    fn test_user_request_from_up_to_pages_takes_over_max_api_pages() {
929        let mut responses = Vec::new();
930        for _ in 0..4 {
931            let response = response_with_next_page();
932            responses.push(response);
933        }
934        let last_response = response_with_last_page();
935        responses.push(last_response);
936        responses.reverse();
937        // config api max pages 2
938        let client = Arc::new(MockRunner::new(responses).with_config(ConfigMock::new(2)));
939        let request: Request<()> = Request::builder()
940            .method(Method::GET)
941            .resource(Resource::new("http://localhost", None))
942            // User requests 5 pages
943            .max_pages(5)
944            .build()
945            .unwrap();
946        let backoff = Backoff::new(
947            &client,
948            0,
949            60,
950            time::now_epoch_seconds,
951            Box::new(Exponential),
952            Box::new(throttle::DynamicFixed),
953        );
954        let throttler: Box<dyn ThrottleStrategy> = Box::new(NoThrottle);
955        let paginator = Paginator::new(&client, request, "http://localhost", backoff, throttler);
956        let responses = paginator.collect::<Vec<Result<HttpResponse>>>();
957        assert_eq!(5, responses.len());
958    }
959
960    #[test]
961    fn test_paginator_auto_throttle_enabled_after_autorate_engage_threshold() {
962        let response1 = response_with_next_page();
963        let response2 = response_with_next_page();
964        let response3 = response_with_next_page();
965        // Throttles in next two requests
966        let response4 = response_with_next_page();
967        let response5 = response_with_last_page();
968        let client = Arc::new(MockRunner::new(vec![
969            response5, response4, response3, response2, response1,
970        ]));
971        let request: Request<()> = Request::builder()
972            .method(Method::GET)
973            .resource(Resource::new("http://localhost", None))
974            .max_pages(5)
975            .build()
976            .unwrap();
977        let throttler = Rc::new(MockThrottler::new(Some(
978            throttle::ThrottleStrategyType::AutoRate,
979        )));
980        let bthrottler: Box<dyn ThrottleStrategy> = Box::new(Rc::clone(&throttler));
981        let backoff = Backoff::new(
982            &client,
983            0,
984            60,
985            time::now_epoch_seconds,
986            Box::new(Exponential),
987            Box::new(throttle::DynamicFixed),
988        );
989        let paginator = Paginator::new(&client, request, "http://localhost", backoff, bthrottler);
990        let responses = paginator.collect::<Vec<Result<HttpResponse>>>();
991        assert_eq!(5, responses.len());
992        assert_eq!(2, *throttler.throttled());
993    }
994}