1pub mod throttle;
2
3use crate::api_traits::ApiOperation;
4use crate::backoff::Backoff;
5use crate::cache::{Cache, CacheState};
6use crate::config::ConfigProperties;
7use crate::error::GRError;
8use crate::io::{
9 parse_page_headers, parse_ratelimit_headers, FlowControlHeaders, HttpResponse, HttpRunner,
10 RateLimitHeader, ResponseField,
11};
12use crate::time::{self, now_epoch_seconds, Seconds};
13use crate::{api_defaults, error, log_debug, log_error};
14use crate::{log_info, Result};
15use serde::{Deserialize, Serialize};
16use std::borrow::Borrow;
17use std::collections::{hash_map, HashMap};
18use std::iter::Iterator;
19use std::rc::Rc;
20use std::sync::{Arc, Mutex};
21use throttle::{ThrottleStrategy, ThrottleStrategyType};
22
23pub struct Client<C> {
24 cache: C,
25 config: Arc<dyn ConfigProperties>,
26 refresh_cache: bool,
27 time_to_ratelimit_reset: Mutex<Seconds>,
28 remaining_requests: Mutex<u32>,
29 http_agent: ureq::Agent,
30}
31
32impl<C> Client<C> {
34 pub fn new(cache: C, config: Arc<dyn ConfigProperties>, refresh_cache: bool) -> Self {
35 let remaining_requests = Mutex::new(api_defaults::DEFAULT_NUMBER_REQUESTS_MINUTE);
36 let time_to_ratelimit_reset = Mutex::new(now_epoch_seconds() + Seconds::new(60));
37 let http_config = ureq::Agent::config_builder()
38 .http_status_as_error(false)
42 .build();
43 Client {
44 cache,
45 refresh_cache,
46 config,
47 time_to_ratelimit_reset,
48 remaining_requests,
49 http_agent: http_config.into(),
50 }
51 }
52
53 fn submit<T: Serialize>(&self, request: &Request<T>) -> Result<HttpResponse> {
54 let response = match request.method {
55 Method::GET => {
56 let req = self.http_agent.get(request.url());
57 let req = Self::add_headers(req, request.headers());
58 req.call()
59 }
60 Method::HEAD => {
61 let req = self.http_agent.head(request.url());
62 let req = Self::add_headers(req, request.headers());
63 req.call()
64 }
65 Method::POST => {
66 let req = self.http_agent.post(request.url());
67 let req = Self::add_headers(req, request.headers());
68 req.send_json(serde_json::to_value(request.body).unwrap())
69 }
70 Method::PATCH => {
71 let req = self.http_agent.patch(request.url());
72 let req = Self::add_headers(req, request.headers());
73 req.send_json(serde_json::to_value(request.body).unwrap())
74 }
75 Method::PUT => {
76 let req = self.http_agent.put(request.url());
77 let req = Self::add_headers(req, request.headers());
78 req.send_json(serde_json::to_value(request.body).unwrap())
79 }
80 };
81
82 match response {
83 Ok(response) => {
84 let status = response.status();
85 let headers =
87 response
88 .headers()
89 .iter()
90 .fold(Headers::new(), |mut headers, (name, value)| {
91 headers.set::<String, String>(
92 name.to_string(),
93 value.to_str().unwrap().to_string(),
94 );
95 headers
96 });
97 let rate_limit_header = Rc::new(parse_ratelimit_headers(Some(&headers)));
98 let page_header = Rc::new(parse_page_headers(Some(&headers)));
99 let flow_control_headers = FlowControlHeaders::new(page_header, rate_limit_header);
100 log_debug!("Response headers: {:?}", headers);
102 let mut body = response.into_body();
103 let mut response = HttpResponse::builder()
104 .status(status.as_u16() as i32)
105 .headers(headers)
106 .body(body.read_to_string().unwrap_or_default())
107 .flow_control_headers(flow_control_headers)
108 .build()
109 .unwrap();
110
111 self.handle_rate_limit(&mut response)?;
112 Ok(response)
113 }
114 Err(err) => Err(GRError::HttpTransportError(err.to_string()).into()),
115 }
116 }
117
118 fn add_headers<B>(
119 mut builder: ureq::RequestBuilder<B>,
120 headers: &Headers,
121 ) -> ureq::RequestBuilder<B> {
122 for (key, value) in headers.iter() {
123 builder = builder.header(key, value);
124 }
125 builder
126 }
127}
128
129impl<C> Client<C> {
130 fn handle_rate_limit(&self, response: &mut HttpResponse) -> Result<()> {
131 if let Some(headers) = response.get_ratelimit_headers().borrow() {
132 if headers.remaining <= self.config.rate_limit_remaining_threshold() {
133 log_error!("Rate limit threshold reached");
134 return Err(error::GRError::RateLimitExceeded(*headers).into());
135 }
136 Ok(())
137 } else {
138 log_info!("Rate limit headers not provided by remote, using defaults");
143 default_rate_limit_handler(
144 response,
145 &self.config,
146 &self.time_to_ratelimit_reset,
147 &self.remaining_requests,
148 now_epoch_seconds,
149 )
150 }
151 }
152}
153
154fn default_rate_limit_handler(
155 response: &mut HttpResponse,
156 config: &Arc<dyn ConfigProperties>,
157 time_to_ratelimit_reset: &Mutex<Seconds>,
158 remaining_requests: &Mutex<u32>,
159 now_epoch_seconds: fn() -> Seconds,
160) -> Result<()> {
161 if let Ok(remaining_requests) = remaining_requests.lock() {
164 if *remaining_requests <= config.rate_limit_remaining_threshold() {
165 let time_to_ratelimit_reset =
166 *time_to_ratelimit_reset.lock().unwrap() - now_epoch_seconds();
167 log_error!(
168 "Remote does not provide rate limit headers, \
169 so the default rate limit of {} per minute has been \
170 exceeded. Try again in {} seconds",
171 api_defaults::DEFAULT_NUMBER_REQUESTS_MINUTE,
172 time_to_ratelimit_reset
173 );
174 let rate_limit_header = RateLimitHeader::new(
175 *remaining_requests,
176 time_to_ratelimit_reset,
177 time_to_ratelimit_reset,
178 );
179 response.update_rate_limit_headers(rate_limit_header);
180 return Err(error::GRError::RateLimitExceeded(RateLimitHeader::new(
181 *remaining_requests,
182 time_to_ratelimit_reset,
183 time_to_ratelimit_reset,
184 ))
185 .into());
186 }
187 } else {
188 return Err(error::GRError::ApplicationError(
189 "http module rate limiting - Cannot read remaining \
190 http requests"
191 .to_string(),
192 )
193 .into());
194 }
195
196 if let Ok(mut remaining_requests) = remaining_requests.lock() {
197 let current_time = now_epoch_seconds();
200 let mut time_to_reset = time_to_ratelimit_reset.lock().unwrap();
201 if current_time > *time_to_reset {
202 *remaining_requests = api_defaults::DEFAULT_NUMBER_REQUESTS_MINUTE;
203 *time_to_reset = current_time + Seconds::new(60);
204 }
205 *remaining_requests -= 1;
206 log_info!(
208 "Remaining requests: {}, reset in: {} seconds",
209 *remaining_requests,
210 time::epoch_to_seconds_relative(*time_to_reset)
211 );
212 } else {
213 return Err(error::GRError::ApplicationError(
214 "http module rate limiting - Cannot decrease counter \
215 number of requests pending"
216 .to_string(),
217 )
218 .into());
219 }
220
221 Ok(())
222}
223
224#[derive(Default)]
225pub struct Resource {
226 pub url: String,
227 pub api_operation: Option<ApiOperation>,
228}
229
230impl Resource {
231 pub fn new(url: &str, api_operation: Option<ApiOperation>) -> Self {
232 Resource {
233 url: url.to_string(),
234 api_operation,
235 }
236 }
237}
238
239#[derive(Serialize, Clone, Debug, Default)]
240pub struct Body<T>(HashMap<String, T>);
241
242impl<T> Body<T> {
243 pub fn new() -> Self {
244 Body(HashMap::new())
245 }
246
247 pub fn add<K: Into<String>>(&mut self, key: K, value: T) {
248 self.0.insert(key.into(), value);
249 }
250}
251
252#[derive(Clone, Debug, Default, Serialize, Deserialize)]
253pub struct Headers(HashMap<String, String>);
254
255impl Headers {
256 pub fn new() -> Self {
257 Headers(HashMap::new())
258 }
259
260 pub fn set<K: Into<String>, V: Into<String>>(&mut self, key: K, value: V) {
261 self.0.insert(key.into(), value.into());
262 }
263
264 pub fn get(&self, key: &str) -> Option<&String> {
265 self.0.get(key)
266 }
267
268 pub fn iter(&self) -> hash_map::Iter<'_, String, String> {
269 self.0.iter()
270 }
271
272 pub fn extend(&mut self, headers: Headers) {
273 for (key, value) in headers.iter() {
274 self.0.insert(key.clone(), value.clone());
275 }
276 }
277}
278
279#[derive(Builder)]
280#[builder(pattern = "owned")]
281pub struct Request<'a, T> {
282 #[builder(setter(into, strip_option), default)]
283 pub body: Option<&'a Body<T>>,
284 #[builder(default)]
285 headers: Headers,
286 pub method: Method,
287 pub resource: Resource,
288 #[builder(setter(into, strip_option), default)]
289 pub max_pages: Option<i64>,
290}
291
292impl<'a, T> Request<'a, T> {
293 pub fn builder() -> RequestBuilder<'a, T> {
294 RequestBuilder::default()
295 }
296
297 pub fn new(url: &str, method: Method) -> Self {
298 Request {
299 body: None,
300 headers: Headers::new(),
301 method,
302 resource: Resource::new(url, None),
303 max_pages: None,
304 }
305 }
306
307 pub fn set_max_pages(&mut self, max_pages: i64) {
308 self.max_pages = Some(max_pages);
309 }
310
311 pub fn with_api_operation(mut self, api_operation: ApiOperation) -> Self {
312 self.resource.api_operation = Some(api_operation);
313 self
314 }
315
316 pub fn api_operation(&self) -> &Option<ApiOperation> {
317 &self.resource.api_operation
318 }
319
320 pub fn set_header(&mut self, key: &str, value: &str) {
321 self.headers.set(key.to_string(), value.to_string());
322 }
323
324 pub fn set_headers(&mut self, headers: Headers) {
325 self.headers = headers;
326 }
327
328 pub fn set_url(&mut self, url: &str) {
329 self.resource.url = url.to_string();
330 }
331
332 pub fn url(&self) -> &str {
333 &self.resource.url
334 }
335
336 pub fn headers(&self) -> &Headers {
337 &self.headers
338 }
339}
340
341#[derive(Clone, Debug, Default, Eq, PartialEq)]
342pub enum Method {
343 #[default]
344 HEAD,
345 GET,
346 POST,
347 PUT,
348 PATCH,
349}
350
351impl<C: Cache<Resource>> HttpRunner for Client<C> {
352 type Response = HttpResponse;
353
354 fn run<T: Serialize>(&self, cmd: &mut Request<T>) -> Result<Self::Response> {
355 match cmd.method {
356 Method::GET => {
357 let mut default_response = HttpResponse::builder().build().unwrap();
358 match self.cache.get(&cmd.resource) {
359 Ok(CacheState::Fresh(mut response)) => {
360 log_debug!("Cache fresh for {}", cmd.resource.url);
361 if !self.refresh_cache {
362 log_debug!("Returning local cached response");
363 response.local_cache = true;
364 return Ok(response);
365 }
366 default_response = response;
367 }
368 Ok(CacheState::Stale(response)) => {
369 log_debug!("Cache stale for {}", cmd.resource.url);
370 default_response = response;
371 }
372 Ok(CacheState::None) => {}
373 Err(err) => return Err(err),
374 }
375 if let Some(etag) = default_response.get_etag() {
378 cmd.set_header("If-None-Match", etag);
379 }
380 let response = self.submit(cmd)?;
382 if response.status == 304 {
383 self.cache
387 .update(&cmd.resource, &response, &ResponseField::Headers)?;
388 return Ok(default_response);
389 }
390 self.cache.set(&cmd.resource, &response).unwrap();
391 Ok(response)
392 }
393 _ => Ok(self.submit(cmd)?),
394 }
395 }
396
397 fn api_max_pages<T: Serialize>(&self, cmd: &Request<T>) -> u32 {
398 let max_pages = self
399 .config
400 .get_max_pages(cmd.resource.api_operation.as_ref().unwrap());
401 max_pages
402 }
403}
404
405pub struct Paginator<'a, R, T> {
406 request: Request<'a, T>,
407 page_url: Option<String>,
408 iter: u32,
409 backoff: Backoff<'a, R>,
410 throttler: Box<dyn ThrottleStrategy>,
411 max_pages: u32,
412}
413
414impl<'a, R: HttpRunner, T: Serialize> Paginator<'a, R, T> {
415 pub fn new(
416 runner: &'a Arc<R>,
417 request: Request<'a, T>,
418 page_url: &str,
419 backoff: Backoff<'a, R>,
420 throttle_strategy: Box<dyn ThrottleStrategy>,
421 ) -> Self {
422 let max_pages = if let Some(max_pages) = request.max_pages {
423 max_pages as u32
424 } else {
425 runner.api_max_pages(&request)
426 };
427 Paginator {
428 request,
429 page_url: Some(page_url.to_string()),
430 iter: 0,
431 backoff,
432 throttler: throttle_strategy,
433 max_pages,
434 }
435 }
436}
437
438impl<T: Serialize, R: HttpRunner<Response = HttpResponse>> Iterator for Paginator<'_, R, T> {
439 type Item = Result<HttpResponse>;
440
441 fn next(&mut self) -> Option<Self::Item> {
442 if let Some(page_url) = &self.page_url {
443 if self.iter >= self.max_pages {
444 return None;
445 }
446 if self.iter >= 1 {
447 self.request.set_url(page_url);
448 }
449 log_info!("Requesting page: {}", self.iter + 1);
450 log_info!("URL: {}", self.request.url());
451
452 let response = match self.backoff.retry_on_error(&mut self.request) {
453 Ok(response) => {
454 if let Some(page_headers) = response.get_page_headers().borrow() {
455 match (page_headers.next_page(), page_headers.last_page()) {
456 (Some(next), _) => self.page_url = Some(next.url().to_string()),
457 (None, _) => self.page_url = None,
458 }
459 } else {
460 self.page_url = None;
461 };
462 Ok(response)
463 }
464 Err(err) => {
465 self.page_url = None;
466 Err(err)
467 }
468 };
469 self.iter += 1;
470 if self.iter < self.max_pages && self.page_url.is_some() {
471 let response = response.as_ref().unwrap();
472 if !response.local_cache {
475 if self.throttler.strategy() == ThrottleStrategyType::AutoRate {
476 if self.iter >= api_defaults::ENGAGE_AUTORATE_THROTTLING_THRESHOLD {
477 self.throttler
478 .throttle(Some(response.get_flow_control_headers()));
479 }
480 } else {
481 self.throttler
482 .throttle(Some(response.get_flow_control_headers()));
483 }
484 }
485 }
486 return Some(response);
487 }
488 None
489 }
490}
491
492#[cfg(test)]
493mod test {
494 use throttle::NoThrottle;
495
496 use super::*;
497
498 use crate::{
499 api_defaults::REST_API_MAX_PAGES,
500 backoff::Exponential,
501 cache,
502 io::{Page, PageHeader},
503 test::utils::{ConfigMock, MockRunner, MockThrottler},
504 };
505
506 fn header_processor_next_page_no_last() -> Rc<Option<PageHeader>> {
507 let mut page_header = PageHeader::new();
508 page_header.set_next_page(Page::new("http://localhost?page=2", 1));
509 Rc::new(Some(page_header))
510 }
511
512 fn header_processor_last_page_no_next() -> Rc<Option<PageHeader>> {
513 let mut page_header = PageHeader::new();
514 page_header.set_last_page(Page::new("http://localhost?page=2", 1));
515 Rc::new(Some(page_header))
516 }
517
518 fn response_with_next_page() -> HttpResponse {
519 let mut headers = Headers::new();
520 headers.set("link".to_string(), "http://localhost?page=2".to_string());
521 let flow_control_headers =
522 FlowControlHeaders::new(header_processor_next_page_no_last(), Rc::new(None));
523 let response = HttpResponse::builder()
524 .status(200)
525 .headers(headers)
526 .flow_control_headers(flow_control_headers)
527 .build()
528 .unwrap();
529 response
530 }
531
532 fn response_with_last_page() -> HttpResponse {
533 let mut headers = Headers::new();
534 headers.set("link".to_string(), "http://localhost?page=2".to_string());
535 let flow_control_headers =
536 FlowControlHeaders::new(header_processor_last_page_no_next(), Rc::new(None));
537 let response = HttpResponse::builder()
538 .status(200)
539 .headers(headers)
540 .flow_control_headers(flow_control_headers)
541 .build()
542 .unwrap();
543 response
544 }
545
546 fn cached_response_next_page() -> HttpResponse {
547 let mut headers = Headers::new();
548 headers.set("link".to_string(), "http://localhost?page=2".to_string());
549 let flow_control_headers =
550 FlowControlHeaders::new(header_processor_next_page_no_last(), Rc::new(None));
551 let response = HttpResponse::builder()
552 .status(200)
553 .headers(headers)
554 .flow_control_headers(flow_control_headers)
555 .local_cache(true)
556 .build()
557 .unwrap();
558 response
559 }
560
561 fn cached_response_last_page() -> HttpResponse {
562 let mut headers = Headers::new();
563 headers.set("link".to_string(), "http://localhost?page=2".to_string());
564 let flow_control_headers =
565 FlowControlHeaders::new(header_processor_last_page_no_next(), Rc::new(None));
566 let response = HttpResponse::builder()
567 .status(200)
568 .headers(headers)
569 .flow_control_headers(flow_control_headers)
570 .local_cache(true)
571 .build()
572 .unwrap();
573 response
574 }
575
576 #[test]
577 fn test_paginator_no_headers_no_next_no_last_pages() {
578 let response = HttpResponse::builder().status(200).build().unwrap();
579 let client = Arc::new(MockRunner::new(vec![response]));
580 let request: Request<()> = Request::new("http://localhost", Method::GET);
581 let throttler: Box<dyn ThrottleStrategy> = Box::new(NoThrottle);
582 let backoff = Backoff::new(
583 &client,
584 0,
585 60,
586 time::now_epoch_seconds,
587 Box::new(Exponential),
588 Box::new(throttle::DynamicFixed),
589 );
590 let paginator = Paginator::new(&client, request, "http://localhost", backoff, throttler);
591 let responses = paginator.collect::<Vec<Result<HttpResponse>>>();
592 assert_eq!(1, responses.len());
593 assert_eq!("http://localhost", *client.url());
594 }
595
596 #[test]
597 fn test_paginator_with_link_headers_one_next_and_no_last_pages() {
598 let response1 = response_with_next_page();
599 let mut headers = Headers::new();
600 headers.set("link".to_string(), "http://localhost?page=2".to_string());
601 let response2 = HttpResponse::builder()
602 .status(200)
603 .headers(headers)
604 .build()
605 .unwrap();
606 let client = Arc::new(MockRunner::new(vec![response2, response1]));
607 let request: Request<()> = Request::new("http://localhost", Method::GET);
608 let throttler: Box<dyn ThrottleStrategy> = Box::new(NoThrottle);
609 let backoff = Backoff::new(
610 &client,
611 0,
612 60,
613 time::now_epoch_seconds,
614 Box::new(Exponential),
615 Box::new(throttle::DynamicFixed),
616 );
617 let paginator = Paginator::new(&client, request, "http://localhost", backoff, throttler);
618 let responses = paginator.collect::<Vec<Result<HttpResponse>>>();
619 assert_eq!(2, responses.len());
620 }
621
622 #[test]
623 fn test_paginator_with_link_headers_one_next_and_one_last_pages() {
624 let response1 = response_with_next_page();
625 let response2 = response_with_last_page();
626 let client = Arc::new(MockRunner::new(vec![response2, response1]));
627 let request: Request<()> = Request::new("http://localhost", Method::GET);
628 let throttler: Box<dyn ThrottleStrategy> = Box::new(NoThrottle);
629 let backoff = Backoff::new(
630 &client,
631 0,
632 60,
633 time::now_epoch_seconds,
634 Box::new(Exponential),
635 Box::new(throttle::DynamicFixed),
636 );
637 let paginator = Paginator::new(&client, request, "http://localhost", backoff, throttler);
638 let responses = paginator.collect::<Vec<Result<HttpResponse>>>();
639 assert_eq!(2, responses.len());
640 }
641
642 #[test]
643 fn test_paginator_error_response() {
644 let response = HttpResponse::builder()
645 .status(500)
646 .body("Internal Server Error".to_string())
647 .build()
648 .unwrap();
649 let client = Arc::new(MockRunner::new(vec![response]));
650 let request: Request<()> = Request::new("http://localhost", Method::GET);
651 let throttler: Box<dyn ThrottleStrategy> = Box::new(NoThrottle);
652 let backoff = Backoff::new(
653 &client,
654 0,
655 60,
656 time::now_epoch_seconds,
657 Box::new(Exponential),
658 Box::new(throttle::DynamicFixed),
659 );
660 let paginator = Paginator::new(&client, request, "http://localhost", backoff, throttler);
661 let responses = paginator.collect::<Vec<Result<HttpResponse>>>();
662 assert_eq!(1, responses.len());
663 assert_eq!("http://localhost", *client.url());
664 }
665
666 #[test]
667 fn test_client_get_api_max_pages() {
668 let config = Arc::new(ConfigMock::new(1));
669 let runner = Client::new(cache::NoCache, config, false);
670 let cmd: Request<()> =
671 Request::new("http://localhost", Method::GET).with_api_operation(ApiOperation::Project);
672 assert_eq!(1, runner.api_max_pages(&cmd))
673 }
674
675 #[test]
676 fn test_paginator_stops_paging_after_api_max_pages_reached() {
677 let response1 = response_with_next_page();
678 let response2 = response_with_next_page();
679 let response3 = response_with_last_page();
680 let client = Arc::new(
682 MockRunner::new(vec![response3, response2, response1]).with_config(ConfigMock::new(1)),
683 );
684 let request: Request<()> = Request::new("http://localhost", Method::GET);
685 let throttler: Box<dyn ThrottleStrategy> = Box::new(NoThrottle);
686 let backoff = Backoff::new(
687 &client,
688 0,
689 60,
690 time::now_epoch_seconds,
691 Box::new(Exponential),
692 Box::new(throttle::DynamicFixed),
693 );
694 let paginator = Paginator::new(&client, request, "http://localhost", backoff, throttler);
695 let responses = paginator.collect::<Vec<Result<HttpResponse>>>();
696 assert_eq!(1, responses.len());
697 }
698
699 #[test]
700 fn test_paginator_limits_to_max_pages_default() {
701 let api_max_pages = REST_API_MAX_PAGES + 5;
702 let mut responses = Vec::new();
703 for _ in 0..api_max_pages {
704 let response = response_with_next_page();
705 responses.push(response);
706 }
707 let last_response = response_with_last_page();
708 responses.push(last_response);
709 responses.reverse();
710 let request: Request<()> = Request::new("http://localhost", Method::GET);
711 let client = Arc::new(MockRunner::new(responses));
712 let throttler: Box<dyn ThrottleStrategy> = Box::new(NoThrottle);
713 let backoff = Backoff::new(
714 &client,
715 0,
716 60,
717 time::now_epoch_seconds,
718 Box::new(Exponential),
719 Box::new(throttle::DynamicFixed),
720 );
721 let paginator = Paginator::new(&client, request, "http://localhost", backoff, throttler);
722 let responses = paginator.collect::<Vec<Result<HttpResponse>>>();
723 assert_eq!(REST_API_MAX_PAGES, responses.len() as u32);
724 }
725
726 #[test]
727 fn test_ratelimit_remaining_threshold_reached_is_error() {
728 let mut headers = Headers::new();
729 headers.set("x-ratelimit-remaining".to_string(), "10".to_string());
730 let flow_control_headers = FlowControlHeaders::new(
731 Rc::new(None),
732 Rc::new(Some(RateLimitHeader::new(
733 10,
734 Seconds::new(60),
735 Seconds::new(60),
736 ))),
737 );
738 let mut response = HttpResponse::builder()
739 .status(200)
740 .headers(headers)
741 .flow_control_headers(flow_control_headers)
742 .build()
743 .unwrap();
744 let client = Client::new(cache::NoCache, Arc::new(ConfigMock::new(1)), false);
745 assert!(client.handle_rate_limit(&mut response).is_err());
746 }
747
748 #[test]
749 fn test_ratelimit_remaining_threshold_not_reached_is_ok() {
750 let mut headers = Headers::new();
751 headers.set("ratelimit-remaining".to_string(), "11".to_string());
752 let mut response = HttpResponse::builder()
753 .status(200)
754 .headers(headers)
755 .build()
756 .unwrap();
757 let client = Client::new(cache::NoCache, Arc::new(ConfigMock::new(1)), false);
758 assert!(client.handle_rate_limit(&mut response).is_ok());
759 }
760
761 fn epoch_seconds_now_mock(secs: u64) -> Seconds {
762 Seconds::new(secs)
763 }
764
765 #[test]
766 fn test_remaining_requests_below_threshold_all_fail() {
767 let remaining_requests = Arc::new(Mutex::new(4));
769 let time_to_ratelimit_reset = Arc::new(Mutex::new(Seconds::new(10)));
771 let now = || -> Seconds { epoch_seconds_now_mock(1) };
772 let config: Arc<dyn ConfigProperties> = Arc::new(ConfigMock::new(1));
773
774 let mut threads = Vec::new();
776 for _ in 0..10 {
777 let remaining_requests = remaining_requests.clone();
778 let time_to_ratelimit_reset = time_to_ratelimit_reset.clone();
779 let config = config.clone();
780 threads.push(std::thread::spawn(move || {
781 let mut response = HttpResponse::builder().status(200).build().unwrap();
782 let result = default_rate_limit_handler(
783 &mut response,
784 &config,
785 &time_to_ratelimit_reset,
786 &remaining_requests,
787 now,
788 );
789 assert!(result.is_err());
790 }));
791 }
792 for thread in threads {
793 thread.join().unwrap();
794 }
795 }
796
797 #[test]
798 fn test_time_to_reset_achieved_resets_counter_all_ok() {
799 let remaining_requests = Arc::new(Mutex::new(11));
801 let time_to_ratelimit_reset = Arc::new(Mutex::new(Seconds::new(1)));
802 let now = || -> Seconds { epoch_seconds_now_mock(2) };
804 let config: Arc<dyn ConfigProperties> = Arc::new(ConfigMock::new(1));
805
806 let mut threads = Vec::new();
807 for _ in 0..70 {
813 let remaining_requests = remaining_requests.clone();
814 let time_to_ratelimit_reset = time_to_ratelimit_reset.clone();
815 let config = config.clone();
816 threads.push(std::thread::spawn(move || {
817 let mut response = HttpResponse::builder().status(200).build().unwrap();
818 let result = default_rate_limit_handler(
819 &mut response,
820 &config,
821 &time_to_ratelimit_reset,
822 &remaining_requests,
823 now,
824 );
825 assert!(result.is_ok());
826 }));
827 }
828 for thread in threads {
829 thread.join().unwrap();
830 }
831 }
832
833 #[test]
834 fn test_paginator_stops_paging_after_http_request_max_pages_reached() {
835 let response1 = response_with_next_page();
836 let response2 = response_with_next_page();
837 let response3 = response_with_last_page();
838 let client = Arc::new(MockRunner::new(vec![response3, response2, response1]));
839 let request: Request<()> = Request::builder()
840 .method(Method::GET)
841 .resource(Resource::new("http://localhost", None))
842 .max_pages(1)
843 .build()
844 .unwrap();
845 let throttler: Box<dyn ThrottleStrategy> = Box::new(NoThrottle);
846 let backoff = Backoff::new(
847 &client,
848 0,
849 60,
850 time::now_epoch_seconds,
851 Box::new(Exponential),
852 Box::new(throttle::DynamicFixed),
853 );
854 let paginator = Paginator::new(&client, request, "http://localhost", backoff, throttler);
855 let responses = paginator.collect::<Vec<Result<HttpResponse>>>();
856 assert_eq!(1, responses.len());
857 }
858
859 #[test]
860 fn test_paginator_fixed_throttle_enabled() {
861 let response1 = response_with_next_page();
862 let response2 = response_with_next_page();
863 let response3 = response_with_last_page();
864 let client = Arc::new(MockRunner::new(vec![response3, response2, response1]));
865 let request: Request<()> = Request::new("http://localhost", Method::GET);
866 let throttler = Rc::new(MockThrottler::new(None));
867 let bthrottler: Box<dyn ThrottleStrategy> = Box::new(Rc::clone(&throttler));
868 let backoff = Backoff::new(
869 &client,
870 0,
871 60,
872 time::now_epoch_seconds,
873 Box::new(Exponential),
874 Box::new(throttle::DynamicFixed),
875 );
876 let paginator = Paginator::new(&client, request, "http://localhost", backoff, bthrottler);
877 let responses = paginator.collect::<Vec<Result<HttpResponse>>>();
878 assert_eq!(3, responses.len());
879 assert_eq!(2, *throttler.throttled());
880 }
881
882 #[test]
883 fn test_paginator_range_throttle_enabled() {
884 let response1 = response_with_next_page();
885 let response2 = response_with_last_page();
886 let client = Arc::new(MockRunner::new(vec![response2, response1]));
887 let request: Request<()> = Request::new("http://localhost", Method::GET);
888 let throttler = Rc::new(MockThrottler::new(None));
889 let bthrottler: Box<dyn ThrottleStrategy> = Box::new(Rc::clone(&throttler));
890 let backoff = Backoff::new(
891 &client,
892 0,
893 60,
894 time::now_epoch_seconds,
895 Box::new(Exponential),
896 Box::new(throttle::DynamicFixed),
897 );
898 let paginator = Paginator::new(&client, request, "http://localhost", backoff, bthrottler);
899 let responses = paginator.collect::<Vec<Result<HttpResponse>>>();
900 assert_eq!(2, responses.len());
901 assert_eq!(1, *throttler.throttled());
902 }
903
904 #[test]
905 fn test_paginator_no_throttle_if_response_is_from_local_cache() {
906 let response1 = cached_response_next_page();
907 let response2 = cached_response_next_page();
908 let response3 = cached_response_last_page();
909 let client = Arc::new(MockRunner::new(vec![response3, response2, response1]));
910 let request: Request<()> = Request::new("http://localhost", Method::GET);
911 let throttler = Rc::new(MockThrottler::new(None));
912 let bthrottler: Box<dyn ThrottleStrategy> = Box::new(Rc::clone(&throttler));
913 let backoff = Backoff::new(
914 &client,
915 0,
916 60,
917 time::now_epoch_seconds,
918 Box::new(Exponential),
919 Box::new(throttle::DynamicFixed),
920 );
921 let paginator = Paginator::new(&client, request, "http://localhost", backoff, bthrottler);
922 let responses = paginator.collect::<Vec<Result<HttpResponse>>>();
923 assert_eq!(3, responses.len());
924 assert_eq!(0, *throttler.throttled());
925 }
926
927 #[test]
928 fn test_user_request_from_up_to_pages_takes_over_max_api_pages() {
929 let mut responses = Vec::new();
930 for _ in 0..4 {
931 let response = response_with_next_page();
932 responses.push(response);
933 }
934 let last_response = response_with_last_page();
935 responses.push(last_response);
936 responses.reverse();
937 let client = Arc::new(MockRunner::new(responses).with_config(ConfigMock::new(2)));
939 let request: Request<()> = Request::builder()
940 .method(Method::GET)
941 .resource(Resource::new("http://localhost", None))
942 .max_pages(5)
944 .build()
945 .unwrap();
946 let backoff = Backoff::new(
947 &client,
948 0,
949 60,
950 time::now_epoch_seconds,
951 Box::new(Exponential),
952 Box::new(throttle::DynamicFixed),
953 );
954 let throttler: Box<dyn ThrottleStrategy> = Box::new(NoThrottle);
955 let paginator = Paginator::new(&client, request, "http://localhost", backoff, throttler);
956 let responses = paginator.collect::<Vec<Result<HttpResponse>>>();
957 assert_eq!(5, responses.len());
958 }
959
960 #[test]
961 fn test_paginator_auto_throttle_enabled_after_autorate_engage_threshold() {
962 let response1 = response_with_next_page();
963 let response2 = response_with_next_page();
964 let response3 = response_with_next_page();
965 let response4 = response_with_next_page();
967 let response5 = response_with_last_page();
968 let client = Arc::new(MockRunner::new(vec![
969 response5, response4, response3, response2, response1,
970 ]));
971 let request: Request<()> = Request::builder()
972 .method(Method::GET)
973 .resource(Resource::new("http://localhost", None))
974 .max_pages(5)
975 .build()
976 .unwrap();
977 let throttler = Rc::new(MockThrottler::new(Some(
978 throttle::ThrottleStrategyType::AutoRate,
979 )));
980 let bthrottler: Box<dyn ThrottleStrategy> = Box::new(Rc::clone(&throttler));
981 let backoff = Backoff::new(
982 &client,
983 0,
984 60,
985 time::now_epoch_seconds,
986 Box::new(Exponential),
987 Box::new(throttle::DynamicFixed),
988 );
989 let paginator = Paginator::new(&client, request, "http://localhost", backoff, bthrottler);
990 let responses = paginator.collect::<Vec<Result<HttpResponse>>>();
991 assert_eq!(5, responses.len());
992 assert_eq!(2, *throttler.throttled());
993 }
994}