apify_client/
request.rs

1use crate::client::{ApifyClient, ApifyApiError};
2use tokio::time::sleep;
3use std::time::Duration;
4use serde::{Deserialize};
5
6// TODO: Make this configurable
7const MAX_RATE_LIMIT_RETRIES: u8 = 8;
8const MAX_SERVER_FAIL_RETRIES: u8 = 8;
9const MAX_TIMEOUT_RETRIES: u8 = 5;
10
11#[derive(Deserialize, Debug)]
12pub struct ApifyApiErrorRaw {
13    r#type: String,
14    message: String,
15}
16
17// TODO: Remove this
18#[derive(Deserialize, Debug)]
19pub struct ApifyApiErrorRawWrapper {
20    error: ApifyApiErrorRaw
21}
22
23impl ApifyClient {
24    async fn simple_request (
25        &self,
26        url: &str,
27        method: &reqwest::Method,
28        optional_body: &Option<Vec<u8>>,
29        headers: &Option<reqwest::header::HeaderMap>
30    ) -> Result<reqwest::Response, reqwest::Error> {
31        let mut req_builder = match *method {
32            reqwest::Method::GET => self.http_client.get(url),
33            reqwest::Method::POST => self.http_client.post(url),
34            reqwest::Method::PUT => self.http_client.put(url),
35            reqwest::Method::DELETE => self.http_client.delete(url),
36            // This error is only for the developer
37            _ => panic!("Request method not usable with Apify API!"),
38        };
39
40        // TODO: Figure out how to remove the clones here
41        if let Some(body) = optional_body.clone() {
42            println!("Body size is: {}", body.len());
43            req_builder = req_builder.body(body);
44        }
45        if let Some(headers) = headers.clone() {
46            req_builder = req_builder.headers(headers);
47        }
48        req_builder.send().await
49    }
50
51    pub async fn retrying_request (
52        &self,
53        url: &str,
54        method: &reqwest::Method,
55        body: &Option<Vec<u8>>,
56        headers: &Option<reqwest::header::HeaderMap>
57    ) -> Result<reqwest::Response, ApifyApiError> {
58        if self.debug_log {
59            println!("Doing {} request to: {}", method, url);
60        }
61        let mut rate_limit_retry_count: u8 = 0;
62        let mut server_failed_retry_count: u8 = 0;
63        let mut timeout_retry_count: u8 = 0;
64        loop {
65            if rate_limit_retry_count >= MAX_RATE_LIMIT_RETRIES {
66                return Err(ApifyApiError::MaxRateLimitRetriesReached(rate_limit_retry_count));
67            }
68            if server_failed_retry_count >= MAX_SERVER_FAIL_RETRIES {
69                return Err(ApifyApiError::MaxServerFailedRetriesReached(server_failed_retry_count));
70            }
71            if timeout_retry_count >= MAX_TIMEOUT_RETRIES {
72                return Err(ApifyApiError::MaxTimeoutRetriesReached(timeout_retry_count));
73            }
74            // TODO: Remove clones (moved in the loop), request could move back the body if should be retried
75            match self.simple_request(url, method, body, headers).await {
76                Ok(resp) => {
77                    let status_code = resp.status().as_u16();
78                    if status_code == 429 || status_code >= 500 {
79                        let time_to_next_retry;
80                        if status_code == 429 {
81                            rate_limit_retry_count += 1;
82                            // TODO: export this as separate func
83                            time_to_next_retry = self.base_time_to_retry * (2 as u32).pow((rate_limit_retry_count).into());
84                            if self.debug_log {
85                                println!("Request got rate limit(429), retry n. will happen {} in: {} ms", rate_limit_retry_count, time_to_next_retry);
86                            }
87                        } else {
88                            server_failed_retry_count += 1;
89                            time_to_next_retry = self.base_time_to_retry * (2 as u32).pow((server_failed_retry_count).into());
90                            if self.debug_log {
91                                println!("Server failed({}), retry n. will happen {} in: {} ms", status_code, rate_limit_retry_count, time_to_next_retry);
92                            }
93                        }
94                        
95                        sleep(Duration::from_millis(time_to_next_retry.into())).await;
96                        continue;
97                    } else if status_code >= 300 {
98                        let raw_error: ApifyApiErrorRawWrapper = resp.json().await.map_err(
99                            |err| ApifyApiError::ApiFailure(format!("Apify API did not return correct error format. Something is very wrong. Please contact support@apify.com\n{}", err))
100                        )?;
101                        // error route
102                        if status_code == 404 {
103                            return Err(ApifyApiError::NotFound(raw_error.error.message));
104                        }
105                        return Err(ApifyApiError::RawError(raw_error.error.message));
106                        // more types here
107                    } else {
108                        // ok route
109                        return Ok(resp);
110                    }
111                }
112                Err(err) => {
113                    if err.is_timeout() {
114                        timeout_retry_count += 1;
115                        let time_to_next_retry = self.base_time_to_retry * (2 as u32).pow((timeout_retry_count).into());
116                        if self.debug_log {
117                            println!("Request timeouted, retry n. will happen {} in: {} ms", rate_limit_retry_count, time_to_next_retry);
118                        }
119                        sleep(Duration::from_millis(time_to_next_retry.into())).await;
120                        continue;
121                    }
122                    // Maybe other types here
123                    return Err(ApifyApiError::ApiFailure(format!("Uknown error, please create an issue on GitHub! {}", err)));
124                }
125            }
126        }
127    }
128}