1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
use crate::client::{ApifyClient, ApifyClientError};
use tokio::time::delay_for;
use std::time::Duration;
use serde::{Deserialize};

// TODO: Make this configurable

const MAX_RATE_LIMIT_RETRIES: u8 = 8;
const MAX_SERVER_FAIL_RETRIES: u8 = 8;
const MAX_TIMEOUT_RETRIES: u8 = 5;

#[derive(Deserialize, Debug)]
pub struct ApifyApiErrorRaw {
    r#type: String,
    message: String,
}

// TODO: Remove this

#[derive(Deserialize, Debug)]
pub struct ApifyApiErrorRawWrapper {
    error: ApifyApiErrorRaw
}

impl ApifyClient {
    async fn simple_request (
        &self,
        url: &str,
        method: &reqwest::Method,
        optional_body: &Option<Vec<u8>>,
        headers: &Option<reqwest::header::HeaderMap>
    ) -> Result<reqwest::Response, reqwest::Error> {
        let mut req_builder = match *method {
            reqwest::Method::GET => self.client.get(url),
            reqwest::Method::POST => self.client.post(url),
            reqwest::Method::PUT => self.client.put(url),
            reqwest::Method::DELETE => self.client.delete(url),
            _ => panic!("Request method not allowed!"),
        };

        // TODO: Figure out how to remove the clones here

        if let Some(body) = optional_body.clone() {
            println!("Body size is: {}", body.len());
            req_builder = req_builder.body(body);
        }
        if let Some(headers) = headers.clone() {
            req_builder = req_builder.headers(headers);
        }
        req_builder.send().await
    }

    pub async fn retrying_request (
        &self,
        url: &str,
        method: &reqwest::Method,
        body: &Option<Vec<u8>>,
        headers: &Option<reqwest::header::HeaderMap>
    ) -> Result<reqwest::Response, ApifyClientError> {
        if self.debug_log {
            println!("Doing request to: {}", url);
        }
        let mut rate_limit_retry_count: u8 = 0;
        let mut server_failed_retry_count: u8 = 0;
        let mut timeout_retry_count: u8 = 0;
        loop {
            if rate_limit_retry_count >= MAX_RATE_LIMIT_RETRIES {
                return Err(ApifyClientError::MaxRateLimitRetriesReached(rate_limit_retry_count));
            }
            if server_failed_retry_count >= MAX_SERVER_FAIL_RETRIES {
                return Err(ApifyClientError::MaxServerFailedRetriesReached(server_failed_retry_count));
            }
            if timeout_retry_count >= MAX_TIMEOUT_RETRIES {
                return Err(ApifyClientError::MaxTimeoutRetriesReached(timeout_retry_count));
            }
            // TODO: Remove clones (moved in the loop), request could move back the body if should be retried

            match self.simple_request(url, method, body, headers).await {
                Ok(resp) => {
                    let status_code = resp.status().as_u16();
                    if status_code == 429 || status_code >= 500 {
                        let time_to_next_retry;
                        if status_code == 429 {
                            rate_limit_retry_count += 1;
                            // TODO: export this as separate func

                            time_to_next_retry = self.base_time_to_retry * (2 as u32).pow((rate_limit_retry_count).into());
                            if self.debug_log {
                                println!("Request got rate limit(429), retry n. will happen {} in: {} ms", rate_limit_retry_count, time_to_next_retry);
                            }
                        } else {
                            server_failed_retry_count += 1;
                            time_to_next_retry = self.base_time_to_retry * (2 as u32).pow((server_failed_retry_count).into());
                            if self.debug_log {
                                println!("Server failed({}), retry n. will happen {} in: {} ms", status_code, rate_limit_retry_count, time_to_next_retry);
                            }
                        }
                        
                        delay_for(Duration::from_millis(time_to_next_retry.into())).await;
                        continue;
                    } else if status_code >= 300 {
                        // TODO: This should never fail but still we should handle this gracefully

                        let raw_error: ApifyApiErrorRawWrapper = resp.json().await.unwrap();
                        // error route

                        if status_code == 404 {
                            return Err(ApifyClientError::NotFound(raw_error.error.message));
                        }
                        return Err(ApifyClientError::RawError(raw_error.error.message));
                        // more types here

                    } else {
                        // ok route

                        // TODO: Remove unwrap

                        return Ok(resp);
                    }
                }
                Err(err) => {
                    if err.is_timeout() {
                        timeout_retry_count += 1;
                        let time_to_next_retry = self.base_time_to_retry * (2 as u32).pow((timeout_retry_count).into());
                        if self.debug_log {
                            println!("Request timeouted, retry n. will happen {} in: {} ms", rate_limit_retry_count, time_to_next_retry);
                        }
                        delay_for(Duration::from_millis(time_to_next_retry.into())).await;
                        continue;
                    }
                    // Maybe other types here

                    panic!("ApifyClientError: Uknown error, please create an issue on GitHub! {}", err);
                }
            }
        }
    }
}