1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
use crate::client::{ApifyClient, ApifyClientError};
use tokio::time::delay_for;
use std::time::Duration;
use serde::{Deserialize};
const MAX_RATE_LIMIT_RETRIES: u8 = 8;
const MAX_SERVER_FAIL_RETRIES: u8 = 8;
const MAX_TIMEOUT_RETRIES: u8 = 5;
#[derive(Deserialize, Debug)]
pub struct ApifyApiErrorRaw {
r#type: String,
message: String,
}
#[derive(Deserialize, Debug)]
pub struct ApifyApiErrorRawWrapper {
error: ApifyApiErrorRaw
}
impl ApifyClient {
async fn simple_request (
&self,
url: &str,
method: &reqwest::Method,
optional_body: &Option<Vec<u8>>,
headers: &Option<reqwest::header::HeaderMap>
) -> Result<reqwest::Response, reqwest::Error> {
let mut req_builder = match *method {
reqwest::Method::GET => self.client.get(url),
reqwest::Method::POST => self.client.post(url),
reqwest::Method::PUT => self.client.put(url),
reqwest::Method::DELETE => self.client.delete(url),
_ => panic!("Request method not allowed!"),
};
if let Some(body) = optional_body.clone() {
println!("Body size is: {}", body.len());
req_builder = req_builder.body(body);
}
if let Some(headers) = headers.clone() {
req_builder = req_builder.headers(headers);
}
req_builder.send().await
}
pub async fn retrying_request (
&self,
url: &str,
method: &reqwest::Method,
body: &Option<Vec<u8>>,
headers: &Option<reqwest::header::HeaderMap>
) -> Result<reqwest::Response, ApifyClientError> {
if self.debug_log {
println!("Doing request to: {}", url);
}
let mut rate_limit_retry_count: u8 = 0;
let mut server_failed_retry_count: u8 = 0;
let mut timeout_retry_count: u8 = 0;
loop {
if rate_limit_retry_count >= MAX_RATE_LIMIT_RETRIES {
return Err(ApifyClientError::MaxRateLimitRetriesReached(rate_limit_retry_count));
}
if server_failed_retry_count >= MAX_SERVER_FAIL_RETRIES {
return Err(ApifyClientError::MaxServerFailedRetriesReached(server_failed_retry_count));
}
if timeout_retry_count >= MAX_TIMEOUT_RETRIES {
return Err(ApifyClientError::MaxTimeoutRetriesReached(timeout_retry_count));
}
match self.simple_request(url, method, body, headers).await {
Ok(resp) => {
let status_code = resp.status().as_u16();
if status_code == 429 || status_code >= 500 {
let time_to_next_retry;
if status_code == 429 {
rate_limit_retry_count += 1;
time_to_next_retry = self.base_time_to_retry * (2 as u32).pow((rate_limit_retry_count).into());
if self.debug_log {
println!("Request got rate limit(429), retry n. will happen {} in: {} ms", rate_limit_retry_count, time_to_next_retry);
}
} else {
server_failed_retry_count += 1;
time_to_next_retry = self.base_time_to_retry * (2 as u32).pow((server_failed_retry_count).into());
if self.debug_log {
println!("Server failed({}), retry n. will happen {} in: {} ms", status_code, rate_limit_retry_count, time_to_next_retry);
}
}
delay_for(Duration::from_millis(time_to_next_retry.into())).await;
continue;
} else if status_code >= 300 {
let raw_error: ApifyApiErrorRawWrapper = resp.json().await.unwrap();
if status_code == 404 {
return Err(ApifyClientError::NotFound(raw_error.error.message));
}
return Err(ApifyClientError::RawError(raw_error.error.message));
} else {
return Ok(resp);
}
}
Err(err) => {
if err.is_timeout() {
timeout_retry_count += 1;
let time_to_next_retry = self.base_time_to_retry * (2 as u32).pow((timeout_retry_count).into());
if self.debug_log {
println!("Request timeouted, retry n. will happen {} in: {} ms", rate_limit_retry_count, time_to_next_retry);
}
delay_for(Duration::from_millis(time_to_next_retry.into())).await;
continue;
}
panic!("ApifyClientError: Uknown error, please create an issue on GitHub! {}", err);
}
}
}
}
}