1pub mod throttle;
2
3use crate::api_traits::ApiOperation;
4use crate::backoff::Backoff;
5use crate::cache::{Cache, CacheState};
6use crate::config::ConfigProperties;
7use crate::error::GRError;
8use crate::io::{
9 parse_page_headers, parse_ratelimit_headers, FlowControlHeaders, HttpResponse, HttpRunner,
10 RateLimitHeader, ResponseField,
11};
12use crate::time::{self, now_epoch_seconds, Seconds};
13use crate::{api_defaults, error, log_debug, log_error};
14use crate::{log_info, Result};
15use serde::{Deserialize, Serialize};
16use std::borrow::Borrow;
17use std::collections::{hash_map, HashMap};
18use std::iter::Iterator;
19use std::rc::Rc;
20use std::sync::{Arc, Mutex};
21use throttle::{ThrottleStrategy, ThrottleStrategyType};
22use ureq::Error;
23
24pub struct Client<C> {
25 cache: C,
26 config: Arc<dyn ConfigProperties>,
27 refresh_cache: bool,
28 time_to_ratelimit_reset: Mutex<Seconds>,
29 remaining_requests: Mutex<u32>,
30}
31
32impl<C> Client<C> {
34 pub fn new(cache: C, config: Arc<dyn ConfigProperties>, refresh_cache: bool) -> Self {
35 let remaining_requests = Mutex::new(api_defaults::DEFAULT_NUMBER_REQUESTS_MINUTE);
36 let time_to_ratelimit_reset = Mutex::new(now_epoch_seconds() + Seconds::new(60));
37 Client {
38 cache,
39 refresh_cache,
40 config,
41 time_to_ratelimit_reset,
42 remaining_requests,
43 }
44 }
45
46 fn submit<T: Serialize>(&self, request: &Request<T>) -> Result<HttpResponse> {
47 let ureq_req = match request.method {
48 Method::GET => ureq::get(request.url()),
49 Method::HEAD => ureq::head(request.url()),
50 Method::POST => ureq::post(request.url()),
51 Method::PATCH => ureq::patch(request.url()),
52 Method::PUT => ureq::put(request.url()),
53 };
54 let ureq_req = request
55 .headers()
56 .iter()
57 .fold(ureq_req, |req, (key, value)| req.set(key, value));
58 let call = || -> std::result::Result<ureq::Response, ureq::Error> {
59 match request.method {
60 Method::GET | Method::HEAD => ureq_req.call(),
61 _ => ureq_req.send_json(serde_json::to_value(request.body).unwrap()),
62 }
63 };
64 match call() {
65 Ok(response) | Err(Error::Status(_, response)) => {
66 let status = response.status().into();
67 let headers =
69 response
70 .headers_names()
71 .iter()
72 .fold(Headers::new(), |mut headers, name| {
73 headers.set(
74 name.to_lowercase(),
75 response.header(name.as_str()).unwrap().to_string(),
76 );
77 headers
78 });
79 let rate_limit_header = Rc::new(parse_ratelimit_headers(Some(&headers)));
80 let page_header = Rc::new(parse_page_headers(Some(&headers)));
81 let flow_control_headers = FlowControlHeaders::new(page_header, rate_limit_header);
82 log_debug!("Response headers: {:?}", headers);
84 let body = response.into_string().unwrap_or_default();
85 let mut response = HttpResponse::builder()
86 .status(status)
87 .body(body)
88 .headers(headers)
89 .flow_control_headers(flow_control_headers)
90 .build()
91 .unwrap();
92 self.handle_rate_limit(&mut response)?;
93 Ok(response)
94 }
95 Err(err) => Err(GRError::HttpTransportError(err.to_string()).into()),
96 }
97 }
98}
99
100impl<C> Client<C> {
101 fn handle_rate_limit(&self, response: &mut HttpResponse) -> Result<()> {
102 if let Some(headers) = response.get_ratelimit_headers().borrow() {
103 if headers.remaining <= self.config.rate_limit_remaining_threshold() {
104 log_error!("Rate limit threshold reached");
105 return Err(error::GRError::RateLimitExceeded(*headers).into());
106 }
107 Ok(())
108 } else {
109 log_info!("Rate limit headers not provided by remote, using defaults");
114 default_rate_limit_handler(
115 response,
116 &self.config,
117 &self.time_to_ratelimit_reset,
118 &self.remaining_requests,
119 now_epoch_seconds,
120 )
121 }
122 }
123}
124
125fn default_rate_limit_handler(
126 response: &mut HttpResponse,
127 config: &Arc<dyn ConfigProperties>,
128 time_to_ratelimit_reset: &Mutex<Seconds>,
129 remaining_requests: &Mutex<u32>,
130 now_epoch_seconds: fn() -> Seconds,
131) -> Result<()> {
132 if let Ok(remaining_requests) = remaining_requests.lock() {
135 if *remaining_requests <= config.rate_limit_remaining_threshold() {
136 let time_to_ratelimit_reset =
137 *time_to_ratelimit_reset.lock().unwrap() - now_epoch_seconds();
138 log_error!(
139 "Remote does not provide rate limit headers, \
140 so the default rate limit of {} per minute has been \
141 exceeded. Try again in {} seconds",
142 api_defaults::DEFAULT_NUMBER_REQUESTS_MINUTE,
143 time_to_ratelimit_reset
144 );
145 let rate_limit_header = RateLimitHeader::new(
146 *remaining_requests,
147 time_to_ratelimit_reset,
148 time_to_ratelimit_reset,
149 );
150 response.update_rate_limit_headers(rate_limit_header);
151 return Err(error::GRError::RateLimitExceeded(RateLimitHeader::new(
152 *remaining_requests,
153 time_to_ratelimit_reset,
154 time_to_ratelimit_reset,
155 ))
156 .into());
157 }
158 } else {
159 return Err(error::GRError::ApplicationError(
160 "http module rate limiting - Cannot read remaining \
161 http requests"
162 .to_string(),
163 )
164 .into());
165 }
166
167 if let Ok(mut remaining_requests) = remaining_requests.lock() {
168 let current_time = now_epoch_seconds();
171 let mut time_to_reset = time_to_ratelimit_reset.lock().unwrap();
172 if current_time > *time_to_reset {
173 *remaining_requests = api_defaults::DEFAULT_NUMBER_REQUESTS_MINUTE;
174 *time_to_reset = current_time + Seconds::new(60);
175 }
176 *remaining_requests -= 1;
177 log_info!(
179 "Remaining requests: {}, reset in: {} seconds",
180 *remaining_requests,
181 time::epoch_to_seconds_relative(*time_to_reset)
182 );
183 } else {
184 return Err(error::GRError::ApplicationError(
185 "http module rate limiting - Cannot decrease counter \
186 number of requests pending"
187 .to_string(),
188 )
189 .into());
190 }
191
192 Ok(())
193}
194
195#[derive(Default)]
196pub struct Resource {
197 pub url: String,
198 pub api_operation: Option<ApiOperation>,
199}
200
201impl Resource {
202 pub fn new(url: &str, api_operation: Option<ApiOperation>) -> Self {
203 Resource {
204 url: url.to_string(),
205 api_operation,
206 }
207 }
208}
209
210#[derive(Serialize, Clone, Debug, Default)]
211pub struct Body<T>(HashMap<String, T>);
212
213impl<T> Body<T> {
214 pub fn new() -> Self {
215 Body(HashMap::new())
216 }
217
218 pub fn add<K: Into<String>>(&mut self, key: K, value: T) {
219 self.0.insert(key.into(), value);
220 }
221}
222
223#[derive(Clone, Debug, Default, Serialize, Deserialize)]
224pub struct Headers(HashMap<String, String>);
225
226impl Headers {
227 pub fn new() -> Self {
228 Headers(HashMap::new())
229 }
230
231 pub fn set<K: Into<String>, V: Into<String>>(&mut self, key: K, value: V) {
232 self.0.insert(key.into(), value.into());
233 }
234
235 pub fn get(&self, key: &str) -> Option<&String> {
236 self.0.get(key)
237 }
238
239 pub fn iter(&self) -> hash_map::Iter<String, String> {
240 self.0.iter()
241 }
242
243 pub fn extend(&mut self, headers: Headers) {
244 for (key, value) in headers.iter() {
245 self.0.insert(key.clone(), value.clone());
246 }
247 }
248}
249
250#[derive(Builder)]
251#[builder(pattern = "owned")]
252pub struct Request<'a, T> {
253 #[builder(setter(into, strip_option), default)]
254 pub body: Option<&'a Body<T>>,
255 #[builder(default)]
256 headers: Headers,
257 pub method: Method,
258 pub resource: Resource,
259 #[builder(setter(into, strip_option), default)]
260 pub max_pages: Option<i64>,
261}
262
263impl<'a, T> Request<'a, T> {
264 pub fn builder() -> RequestBuilder<'a, T> {
265 RequestBuilder::default()
266 }
267
268 pub fn new(url: &str, method: Method) -> Self {
269 Request {
270 body: None,
271 headers: Headers::new(),
272 method,
273 resource: Resource::new(url, None),
274 max_pages: None,
275 }
276 }
277
278 pub fn set_max_pages(&mut self, max_pages: i64) {
279 self.max_pages = Some(max_pages);
280 }
281
282 pub fn with_api_operation(mut self, api_operation: ApiOperation) -> Self {
283 self.resource.api_operation = Some(api_operation);
284 self
285 }
286
287 pub fn api_operation(&self) -> &Option<ApiOperation> {
288 &self.resource.api_operation
289 }
290
291 pub fn set_header(&mut self, key: &str, value: &str) {
292 self.headers.set(key.to_string(), value.to_string());
293 }
294
295 pub fn set_headers(&mut self, headers: Headers) {
296 self.headers = headers;
297 }
298
299 pub fn set_url(&mut self, url: &str) {
300 self.resource.url = url.to_string();
301 }
302
303 pub fn url(&self) -> &str {
304 &self.resource.url
305 }
306
307 pub fn headers(&self) -> &Headers {
308 &self.headers
309 }
310}
311
312#[derive(Clone, Debug, Default, Eq, PartialEq)]
313pub enum Method {
314 #[default]
315 HEAD,
316 GET,
317 POST,
318 PUT,
319 PATCH,
320}
321
322impl<C: Cache<Resource>> HttpRunner for Client<C> {
323 type Response = HttpResponse;
324
325 fn run<T: Serialize>(&self, cmd: &mut Request<T>) -> Result<Self::Response> {
326 match cmd.method {
327 Method::GET => {
328 let mut default_response = HttpResponse::builder().build().unwrap();
329 match self.cache.get(&cmd.resource) {
330 Ok(CacheState::Fresh(mut response)) => {
331 log_debug!("Cache fresh for {}", cmd.resource.url);
332 if !self.refresh_cache {
333 log_debug!("Returning local cached response");
334 response.local_cache = true;
335 return Ok(response);
336 }
337 default_response = response;
338 }
339 Ok(CacheState::Stale(response)) => {
340 log_debug!("Cache stale for {}", cmd.resource.url);
341 default_response = response;
342 }
343 Ok(CacheState::None) => {}
344 Err(err) => return Err(err),
345 }
346 if let Some(etag) = default_response.get_etag() {
349 cmd.set_header("If-None-Match", etag);
350 }
351 let response = self.submit(cmd)?;
353 if response.status == 304 {
354 self.cache
358 .update(&cmd.resource, &response, &ResponseField::Headers)?;
359 return Ok(default_response);
360 }
361 self.cache.set(&cmd.resource, &response).unwrap();
362 Ok(response)
363 }
364 _ => Ok(self.submit(cmd)?),
365 }
366 }
367
368 fn api_max_pages<T: Serialize>(&self, cmd: &Request<T>) -> u32 {
369 let max_pages = self
370 .config
371 .get_max_pages(cmd.resource.api_operation.as_ref().unwrap());
372 max_pages
373 }
374}
375
376pub struct Paginator<'a, R, T> {
377 request: Request<'a, T>,
378 page_url: Option<String>,
379 iter: u32,
380 backoff: Backoff<'a, R>,
381 throttler: Box<dyn ThrottleStrategy>,
382 max_pages: u32,
383}
384
385impl<'a, R: HttpRunner, T: Serialize> Paginator<'a, R, T> {
386 pub fn new(
387 runner: &'a Arc<R>,
388 request: Request<'a, T>,
389 page_url: &str,
390 backoff: Backoff<'a, R>,
391 throttle_strategy: Box<dyn ThrottleStrategy>,
392 ) -> Self {
393 let max_pages = if let Some(max_pages) = request.max_pages {
394 max_pages as u32
395 } else {
396 runner.api_max_pages(&request)
397 };
398 Paginator {
399 request,
400 page_url: Some(page_url.to_string()),
401 iter: 0,
402 backoff,
403 throttler: throttle_strategy,
404 max_pages,
405 }
406 }
407}
408
409impl<T: Serialize, R: HttpRunner<Response = HttpResponse>> Iterator for Paginator<'_, R, T> {
410 type Item = Result<HttpResponse>;
411
412 fn next(&mut self) -> Option<Self::Item> {
413 if let Some(page_url) = &self.page_url {
414 if self.iter >= self.max_pages {
415 return None;
416 }
417 if self.iter >= 1 {
418 self.request.set_url(page_url);
419 }
420 log_info!("Requesting page: {}", self.iter + 1);
421 log_info!("URL: {}", self.request.url());
422
423 let response = match self.backoff.retry_on_error(&mut self.request) {
424 Ok(response) => {
425 if let Some(page_headers) = response.get_page_headers().borrow() {
426 match (page_headers.next_page(), page_headers.last_page()) {
427 (Some(next), _) => self.page_url = Some(next.url().to_string()),
428 (None, _) => self.page_url = None,
429 }
430 } else {
431 self.page_url = None;
432 };
433 Ok(response)
434 }
435 Err(err) => {
436 self.page_url = None;
437 Err(err)
438 }
439 };
440 self.iter += 1;
441 if self.iter < self.max_pages && self.page_url.is_some() {
442 let response = response.as_ref().unwrap();
443 if !response.local_cache {
446 if self.throttler.strategy() == ThrottleStrategyType::AutoRate {
447 if self.iter >= api_defaults::ENGAGE_AUTORATE_THROTTLING_THRESHOLD {
448 self.throttler
449 .throttle(Some(response.get_flow_control_headers()));
450 }
451 } else {
452 self.throttler
453 .throttle(Some(response.get_flow_control_headers()));
454 }
455 }
456 }
457 return Some(response);
458 }
459 None
460 }
461}
462
463#[cfg(test)]
464mod test {
465 use throttle::NoThrottle;
466
467 use super::*;
468
469 use crate::{
470 api_defaults::REST_API_MAX_PAGES,
471 backoff::Exponential,
472 cache,
473 io::{Page, PageHeader},
474 test::utils::{ConfigMock, MockRunner, MockThrottler},
475 };
476
477 fn header_processor_next_page_no_last() -> Rc<Option<PageHeader>> {
478 let mut page_header = PageHeader::new();
479 page_header.set_next_page(Page::new("http://localhost?page=2", 1));
480 Rc::new(Some(page_header))
481 }
482
483 fn header_processor_last_page_no_next() -> Rc<Option<PageHeader>> {
484 let mut page_header = PageHeader::new();
485 page_header.set_last_page(Page::new("http://localhost?page=2", 1));
486 Rc::new(Some(page_header))
487 }
488
489 fn response_with_next_page() -> HttpResponse {
490 let mut headers = Headers::new();
491 headers.set("link".to_string(), "http://localhost?page=2".to_string());
492 let flow_control_headers =
493 FlowControlHeaders::new(header_processor_next_page_no_last(), Rc::new(None));
494 let response = HttpResponse::builder()
495 .status(200)
496 .headers(headers)
497 .flow_control_headers(flow_control_headers)
498 .build()
499 .unwrap();
500 response
501 }
502
503 fn response_with_last_page() -> HttpResponse {
504 let mut headers = Headers::new();
505 headers.set("link".to_string(), "http://localhost?page=2".to_string());
506 let flow_control_headers =
507 FlowControlHeaders::new(header_processor_last_page_no_next(), Rc::new(None));
508 let response = HttpResponse::builder()
509 .status(200)
510 .headers(headers)
511 .flow_control_headers(flow_control_headers)
512 .build()
513 .unwrap();
514 response
515 }
516
517 fn cached_response_next_page() -> HttpResponse {
518 let mut headers = Headers::new();
519 headers.set("link".to_string(), "http://localhost?page=2".to_string());
520 let flow_control_headers =
521 FlowControlHeaders::new(header_processor_next_page_no_last(), Rc::new(None));
522 let response = HttpResponse::builder()
523 .status(200)
524 .headers(headers)
525 .flow_control_headers(flow_control_headers)
526 .local_cache(true)
527 .build()
528 .unwrap();
529 response
530 }
531
532 fn cached_response_last_page() -> HttpResponse {
533 let mut headers = Headers::new();
534 headers.set("link".to_string(), "http://localhost?page=2".to_string());
535 let flow_control_headers =
536 FlowControlHeaders::new(header_processor_last_page_no_next(), Rc::new(None));
537 let response = HttpResponse::builder()
538 .status(200)
539 .headers(headers)
540 .flow_control_headers(flow_control_headers)
541 .local_cache(true)
542 .build()
543 .unwrap();
544 response
545 }
546
547 #[test]
548 fn test_paginator_no_headers_no_next_no_last_pages() {
549 let response = HttpResponse::builder().status(200).build().unwrap();
550 let client = Arc::new(MockRunner::new(vec![response]));
551 let request: Request<()> = Request::new("http://localhost", Method::GET);
552 let throttler: Box<dyn ThrottleStrategy> = Box::new(NoThrottle::default());
553 let backoff = Backoff::new(
554 &client,
555 0,
556 60,
557 time::now_epoch_seconds,
558 Box::new(Exponential),
559 Box::new(throttle::DynamicFixed),
560 );
561 let paginator = Paginator::new(&client, request, "http://localhost", backoff, throttler);
562 let responses = paginator.collect::<Vec<Result<HttpResponse>>>();
563 assert_eq!(1, responses.len());
564 assert_eq!("http://localhost", *client.url());
565 }
566
567 #[test]
568 fn test_paginator_with_link_headers_one_next_and_no_last_pages() {
569 let response1 = response_with_next_page();
570 let mut headers = Headers::new();
571 headers.set("link".to_string(), "http://localhost?page=2".to_string());
572 let response2 = HttpResponse::builder()
573 .status(200)
574 .headers(headers)
575 .build()
576 .unwrap();
577 let client = Arc::new(MockRunner::new(vec![response2, response1]));
578 let request: Request<()> = Request::new("http://localhost", Method::GET);
579 let throttler: Box<dyn ThrottleStrategy> = Box::new(NoThrottle::default());
580 let backoff = Backoff::new(
581 &client,
582 0,
583 60,
584 time::now_epoch_seconds,
585 Box::new(Exponential),
586 Box::new(throttle::DynamicFixed),
587 );
588 let paginator = Paginator::new(&client, request, "http://localhost", backoff, throttler);
589 let responses = paginator.collect::<Vec<Result<HttpResponse>>>();
590 assert_eq!(2, responses.len());
591 }
592
593 #[test]
594 fn test_paginator_with_link_headers_one_next_and_one_last_pages() {
595 let response1 = response_with_next_page();
596 let response2 = response_with_last_page();
597 let client = Arc::new(MockRunner::new(vec![response2, response1]));
598 let request: Request<()> = Request::new("http://localhost", Method::GET);
599 let throttler: Box<dyn ThrottleStrategy> = Box::new(NoThrottle::default());
600 let backoff = Backoff::new(
601 &client,
602 0,
603 60,
604 time::now_epoch_seconds,
605 Box::new(Exponential),
606 Box::new(throttle::DynamicFixed),
607 );
608 let paginator = Paginator::new(&client, request, "http://localhost", backoff, throttler);
609 let responses = paginator.collect::<Vec<Result<HttpResponse>>>();
610 assert_eq!(2, responses.len());
611 }
612
613 #[test]
614 fn test_paginator_error_response() {
615 let response = HttpResponse::builder()
616 .status(500)
617 .body("Internal Server Error".to_string())
618 .build()
619 .unwrap();
620 let client = Arc::new(MockRunner::new(vec![response]));
621 let request: Request<()> = Request::new("http://localhost", Method::GET);
622 let throttler: Box<dyn ThrottleStrategy> = Box::new(NoThrottle::default());
623 let backoff = Backoff::new(
624 &client,
625 0,
626 60,
627 time::now_epoch_seconds,
628 Box::new(Exponential),
629 Box::new(throttle::DynamicFixed),
630 );
631 let paginator = Paginator::new(&client, request, "http://localhost", backoff, throttler);
632 let responses = paginator.collect::<Vec<Result<HttpResponse>>>();
633 assert_eq!(1, responses.len());
634 assert_eq!("http://localhost", *client.url());
635 }
636
637 #[test]
638 fn test_client_get_api_max_pages() {
639 let config = Arc::new(ConfigMock::new(1));
640 let runner = Client::new(cache::NoCache, config, false);
641 let cmd: Request<()> =
642 Request::new("http://localhost", Method::GET).with_api_operation(ApiOperation::Project);
643 assert_eq!(1, runner.api_max_pages(&cmd))
644 }
645
646 #[test]
647 fn test_paginator_stops_paging_after_api_max_pages_reached() {
648 let response1 = response_with_next_page();
649 let response2 = response_with_next_page();
650 let response3 = response_with_last_page();
651 let client = Arc::new(
653 MockRunner::new(vec![response3, response2, response1]).with_config(ConfigMock::new(1)),
654 );
655 let request: Request<()> = Request::new("http://localhost", Method::GET);
656 let throttler: Box<dyn ThrottleStrategy> = Box::new(NoThrottle::default());
657 let backoff = Backoff::new(
658 &client,
659 0,
660 60,
661 time::now_epoch_seconds,
662 Box::new(Exponential),
663 Box::new(throttle::DynamicFixed),
664 );
665 let paginator = Paginator::new(&client, request, "http://localhost", backoff, throttler);
666 let responses = paginator.collect::<Vec<Result<HttpResponse>>>();
667 assert_eq!(1, responses.len());
668 }
669
670 #[test]
671 fn test_paginator_limits_to_max_pages_default() {
672 let api_max_pages = REST_API_MAX_PAGES + 5;
673 let mut responses = Vec::new();
674 for _ in 0..api_max_pages {
675 let response = response_with_next_page();
676 responses.push(response);
677 }
678 let last_response = response_with_last_page();
679 responses.push(last_response);
680 responses.reverse();
681 let request: Request<()> = Request::new("http://localhost", Method::GET);
682 let client = Arc::new(MockRunner::new(responses));
683 let throttler: Box<dyn ThrottleStrategy> = Box::new(NoThrottle::default());
684 let backoff = Backoff::new(
685 &client,
686 0,
687 60,
688 time::now_epoch_seconds,
689 Box::new(Exponential),
690 Box::new(throttle::DynamicFixed),
691 );
692 let paginator = Paginator::new(&client, request, "http://localhost", backoff, throttler);
693 let responses = paginator.collect::<Vec<Result<HttpResponse>>>();
694 assert_eq!(REST_API_MAX_PAGES, responses.len() as u32);
695 }
696
697 #[test]
698 fn test_ratelimit_remaining_threshold_reached_is_error() {
699 let mut headers = Headers::new();
700 headers.set("x-ratelimit-remaining".to_string(), "10".to_string());
701 let flow_control_headers = FlowControlHeaders::new(
702 Rc::new(None),
703 Rc::new(Some(RateLimitHeader::new(
704 10,
705 Seconds::new(60),
706 Seconds::new(60),
707 ))),
708 );
709 let mut response = HttpResponse::builder()
710 .status(200)
711 .headers(headers)
712 .flow_control_headers(flow_control_headers)
713 .build()
714 .unwrap();
715 let client = Client::new(cache::NoCache, Arc::new(ConfigMock::new(1)), false);
716 assert!(client.handle_rate_limit(&mut response).is_err());
717 }
718
719 #[test]
720 fn test_ratelimit_remaining_threshold_not_reached_is_ok() {
721 let mut headers = Headers::new();
722 headers.set("ratelimit-remaining".to_string(), "11".to_string());
723 let mut response = HttpResponse::builder()
724 .status(200)
725 .headers(headers)
726 .build()
727 .unwrap();
728 let client = Client::new(cache::NoCache, Arc::new(ConfigMock::new(1)), false);
729 assert!(client.handle_rate_limit(&mut response).is_ok());
730 }
731
732 fn epoch_seconds_now_mock(secs: u64) -> Seconds {
733 Seconds::new(secs)
734 }
735
736 #[test]
737 fn test_remaining_requests_below_threshold_all_fail() {
738 let remaining_requests = Arc::new(Mutex::new(4));
740 let time_to_ratelimit_reset = Arc::new(Mutex::new(Seconds::new(10)));
742 let now = || -> Seconds { epoch_seconds_now_mock(1) };
743 let config: Arc<dyn ConfigProperties> = Arc::new(ConfigMock::new(1));
744
745 let mut threads = Vec::new();
747 for _ in 0..10 {
748 let remaining_requests = remaining_requests.clone();
749 let time_to_ratelimit_reset = time_to_ratelimit_reset.clone();
750 let config = config.clone();
751 threads.push(std::thread::spawn(move || {
752 let mut response = HttpResponse::builder().status(200).build().unwrap();
753 let result = default_rate_limit_handler(
754 &mut response,
755 &config,
756 &time_to_ratelimit_reset,
757 &remaining_requests,
758 now,
759 );
760 assert!(result.is_err());
761 }));
762 }
763 for thread in threads {
764 thread.join().unwrap();
765 }
766 }
767
768 #[test]
769 fn test_time_to_reset_achieved_resets_counter_all_ok() {
770 let remaining_requests = Arc::new(Mutex::new(11));
772 let time_to_ratelimit_reset = Arc::new(Mutex::new(Seconds::new(1)));
773 let now = || -> Seconds { epoch_seconds_now_mock(2) };
775 let config: Arc<dyn ConfigProperties> = Arc::new(ConfigMock::new(1));
776
777 let mut threads = Vec::new();
778 for _ in 0..70 {
784 let remaining_requests = remaining_requests.clone();
785 let time_to_ratelimit_reset = time_to_ratelimit_reset.clone();
786 let config = config.clone();
787 threads.push(std::thread::spawn(move || {
788 let mut response = HttpResponse::builder().status(200).build().unwrap();
789 let result = default_rate_limit_handler(
790 &mut response,
791 &config,
792 &time_to_ratelimit_reset,
793 &remaining_requests,
794 now,
795 );
796 assert!(result.is_ok());
797 }));
798 }
799 for thread in threads {
800 thread.join().unwrap();
801 }
802 }
803
804 #[test]
805 fn test_paginator_stops_paging_after_http_request_max_pages_reached() {
806 let response1 = response_with_next_page();
807 let response2 = response_with_next_page();
808 let response3 = response_with_last_page();
809 let client = Arc::new(MockRunner::new(vec![response3, response2, response1]));
810 let request: Request<()> = Request::builder()
811 .method(Method::GET)
812 .resource(Resource::new("http://localhost", None))
813 .max_pages(1)
814 .build()
815 .unwrap();
816 let throttler: Box<dyn ThrottleStrategy> = Box::new(NoThrottle::default());
817 let backoff = Backoff::new(
818 &client,
819 0,
820 60,
821 time::now_epoch_seconds,
822 Box::new(Exponential),
823 Box::new(throttle::DynamicFixed),
824 );
825 let paginator = Paginator::new(&client, request, "http://localhost", backoff, throttler);
826 let responses = paginator.collect::<Vec<Result<HttpResponse>>>();
827 assert_eq!(1, responses.len());
828 }
829
830 #[test]
831 fn test_paginator_fixed_throttle_enabled() {
832 let response1 = response_with_next_page();
833 let response2 = response_with_next_page();
834 let response3 = response_with_last_page();
835 let client = Arc::new(MockRunner::new(vec![response3, response2, response1]));
836 let request: Request<()> = Request::new("http://localhost", Method::GET);
837 let throttler = Rc::new(MockThrottler::new(None));
838 let bthrottler: Box<dyn ThrottleStrategy> = Box::new(Rc::clone(&throttler));
839 let backoff = Backoff::new(
840 &client,
841 0,
842 60,
843 time::now_epoch_seconds,
844 Box::new(Exponential),
845 Box::new(throttle::DynamicFixed),
846 );
847 let paginator = Paginator::new(&client, request, "http://localhost", backoff, bthrottler);
848 let responses = paginator.collect::<Vec<Result<HttpResponse>>>();
849 assert_eq!(3, responses.len());
850 assert_eq!(2, *throttler.throttled());
851 }
852
853 #[test]
854 fn test_paginator_range_throttle_enabled() {
855 let response1 = response_with_next_page();
856 let response2 = response_with_last_page();
857 let client = Arc::new(MockRunner::new(vec![response2, response1]));
858 let request: Request<()> = Request::new("http://localhost", Method::GET);
859 let throttler = Rc::new(MockThrottler::new(None));
860 let bthrottler: Box<dyn ThrottleStrategy> = Box::new(Rc::clone(&throttler));
861 let backoff = Backoff::new(
862 &client,
863 0,
864 60,
865 time::now_epoch_seconds,
866 Box::new(Exponential),
867 Box::new(throttle::DynamicFixed),
868 );
869 let paginator = Paginator::new(&client, request, "http://localhost", backoff, bthrottler);
870 let responses = paginator.collect::<Vec<Result<HttpResponse>>>();
871 assert_eq!(2, responses.len());
872 assert_eq!(1, *throttler.throttled());
873 }
874
875 #[test]
876 fn test_paginator_no_throttle_if_response_is_from_local_cache() {
877 let response1 = cached_response_next_page();
878 let response2 = cached_response_next_page();
879 let response3 = cached_response_last_page();
880 let client = Arc::new(MockRunner::new(vec![response3, response2, response1]));
881 let request: Request<()> = Request::new("http://localhost", Method::GET);
882 let throttler = Rc::new(MockThrottler::new(None));
883 let bthrottler: Box<dyn ThrottleStrategy> = Box::new(Rc::clone(&throttler));
884 let backoff = Backoff::new(
885 &client,
886 0,
887 60,
888 time::now_epoch_seconds,
889 Box::new(Exponential),
890 Box::new(throttle::DynamicFixed),
891 );
892 let paginator = Paginator::new(&client, request, "http://localhost", backoff, bthrottler);
893 let responses = paginator.collect::<Vec<Result<HttpResponse>>>();
894 assert_eq!(3, responses.len());
895 assert_eq!(0, *throttler.throttled());
896 }
897
898 #[test]
899 fn test_user_request_from_up_to_pages_takes_over_max_api_pages() {
900 let mut responses = Vec::new();
901 for _ in 0..4 {
902 let response = response_with_next_page();
903 responses.push(response);
904 }
905 let last_response = response_with_last_page();
906 responses.push(last_response);
907 responses.reverse();
908 let client = Arc::new(MockRunner::new(responses).with_config(ConfigMock::new(2)));
910 let request: Request<()> = Request::builder()
911 .method(Method::GET)
912 .resource(Resource::new("http://localhost", None))
913 .max_pages(5)
915 .build()
916 .unwrap();
917 let backoff = Backoff::new(
918 &client,
919 0,
920 60,
921 time::now_epoch_seconds,
922 Box::new(Exponential),
923 Box::new(throttle::DynamicFixed),
924 );
925 let throttler: Box<dyn ThrottleStrategy> = Box::new(NoThrottle::default());
926 let paginator = Paginator::new(&client, request, "http://localhost", backoff, throttler);
927 let responses = paginator.collect::<Vec<Result<HttpResponse>>>();
928 assert_eq!(5, responses.len());
929 }
930
931 #[test]
932 fn test_paginator_auto_throttle_enabled_after_autorate_engage_threshold() {
933 let response1 = response_with_next_page();
934 let response2 = response_with_next_page();
935 let response3 = response_with_next_page();
936 let response4 = response_with_next_page();
938 let response5 = response_with_last_page();
939 let client = Arc::new(MockRunner::new(vec![
940 response5, response4, response3, response2, response1,
941 ]));
942 let request: Request<()> = Request::builder()
943 .method(Method::GET)
944 .resource(Resource::new("http://localhost", None))
945 .max_pages(5)
946 .build()
947 .unwrap();
948 let throttler = Rc::new(MockThrottler::new(Some(
949 throttle::ThrottleStrategyType::AutoRate,
950 )));
951 let bthrottler: Box<dyn ThrottleStrategy> = Box::new(Rc::clone(&throttler));
952 let backoff = Backoff::new(
953 &client,
954 0,
955 60,
956 time::now_epoch_seconds,
957 Box::new(Exponential),
958 Box::new(throttle::DynamicFixed),
959 );
960 let paginator = Paginator::new(&client, request, "http://localhost", backoff, bthrottler);
961 let responses = paginator.collect::<Vec<Result<HttpResponse>>>();
962 assert_eq!(5, responses.len());
963 assert_eq!(2, *throttler.throttled());
964 }
965}