Skip to main content

ccxt_core/http_client/
request.rs

1use crate::error::{Error, Result};
2use reqwest::{Method, header::HeaderMap};
3use serde_json::Value;
4use tracing::{debug, error, instrument, warn};
5
6use super::builder::HttpClient;
7
8impl HttpClient {
9    /// Executes an HTTP request with automatic retry mechanism and timeout control.
10    ///
11    /// The entire retry flow is wrapped with `tokio::time::timeout` to ensure
12    /// that the total operation time (including all retries) does not exceed
13    /// the configured timeout.
14    ///
15    /// # Arguments
16    ///
17    /// * `url` - Target URL for the request
18    /// * `method` - HTTP method to use
19    /// * `headers` - Optional custom headers
20    /// * `body` - Optional request body as JSON
21    ///
22    /// # Returns
23    ///
24    /// Returns the response body as a JSON `Value`.
25    ///
26    /// # Errors
27    ///
28    /// Returns an error if:
29    /// - The operation times out (including all retries)
30    /// - All retry attempts fail
31    /// - The server returns an error status code
32    /// - Network communication fails
33    #[instrument(
34        name = "http_fetch",
35        skip(self, headers, body),
36        fields(method = %method, url = %url, timeout_ms = %self.config().timeout.as_millis())
37    )]
38    pub async fn fetch(
39        &self,
40        url: &str,
41        method: Method,
42        headers: Option<HeaderMap>,
43        body: Option<Value>,
44    ) -> Result<Value> {
45        if let Some(cb) = self.circuit_breaker() {
46            cb.allow_request()?;
47        }
48
49        #[allow(clippy::collapsible_if)]
50        if self.config().enable_rate_limit {
51            if let Some(limiter) = self.rate_limiter() {
52                limiter.wait().await;
53            }
54        }
55
56        let total_timeout = self.config().timeout;
57        let url_for_error = url.to_string();
58
59        let result = match tokio::time::timeout(
60            total_timeout,
61            self.execute_with_retry(|| {
62                let url = url.to_string();
63                let method = method.clone();
64                let headers = headers.clone();
65                let body = body.clone();
66                async move { self.fetch_once(&url, method, headers, body).await }
67            }),
68        )
69        .await
70        {
71            Ok(result) => result,
72            Err(_elapsed) => {
73                warn!(
74                    url = %url_for_error,
75                    timeout_ms = %total_timeout.as_millis(),
76                    "HTTP request timed out (including retries)"
77                );
78                Err(Error::timeout(format!(
79                    "Request to {} timed out after {}ms",
80                    url_for_error,
81                    total_timeout.as_millis()
82                )))
83            }
84        };
85
86        if let Some(cb) = self.circuit_breaker() {
87            match &result {
88                Ok(_) => cb.record_success(),
89                Err(_) => cb.record_failure(),
90            }
91        }
92
93        result
94    }
95
96    #[instrument(
97        name = "http_fetch_once",
98        skip(self, headers, body),
99        fields(method = %method, url = %url, has_body = body.is_some())
100    )]
101    async fn fetch_once(
102        &self,
103        url: &str,
104        method: Method,
105        headers: Option<HeaderMap>,
106        body: Option<Value>,
107    ) -> Result<Value> {
108        let mut request = self.client().request(method.clone(), url);
109
110        if let Some(headers) = headers {
111            request = request.headers(headers);
112        }
113
114        if let Some(ref body) = body {
115            let body_str = serde_json::to_string(body)
116                .map_err(|e| Error::invalid_request(format!("JSON serialization failed: {e}")))?;
117
118            if body_str.len() > self.config().max_request_size {
119                return Err(Error::invalid_request(format!(
120                    "Request body {} bytes exceeds limit {} bytes",
121                    body_str.len(),
122                    self.config().max_request_size
123                )));
124            }
125
126            request = request.body(body_str);
127        }
128
129        if self.config().verbose {
130            if let Some(body) = &body {
131                debug!(
132                    body = ?body,
133                    "HTTP request with body"
134                );
135            } else {
136                debug!("HTTP request without body");
137            }
138        }
139
140        let response = request.send().await.map_err(|e| {
141            error!(
142                error = %e,
143                "HTTP request send failed"
144            );
145            Error::network(format!("Request failed: {e}"))
146        })?;
147
148        self.process_response_with_limit(response, url).await
149    }
150
151    /// Executes a GET request.
152    ///
153    /// # Arguments
154    ///
155    /// * `url` - Target URL
156    /// * `headers` - Optional custom headers
157    ///
158    /// # Returns
159    ///
160    /// Returns the response body as a JSON `Value`.
161    ///
162    /// # Errors
163    ///
164    /// Returns an error if the request fails.
165    #[instrument(name = "http_get", skip(self, headers), fields(url = %url))]
166    pub async fn get(&self, url: &str, headers: Option<HeaderMap>) -> Result<Value> {
167        self.fetch(url, Method::GET, headers, None).await
168    }
169
170    /// Executes a POST request.
171    ///
172    /// # Arguments
173    ///
174    /// * `url` - Target URL
175    /// * `headers` - Optional custom headers
176    /// * `body` - Optional request body as JSON
177    ///
178    /// # Returns
179    ///
180    /// Returns the response body as a JSON `Value`.
181    ///
182    /// # Errors
183    ///
184    /// Returns an error if the request fails.
185    #[instrument(name = "http_post", skip(self, headers, body), fields(url = %url))]
186    pub async fn post(
187        &self,
188        url: &str,
189        headers: Option<HeaderMap>,
190        body: Option<Value>,
191    ) -> Result<Value> {
192        self.fetch(url, Method::POST, headers, body).await
193    }
194
195    /// Executes a PUT request.
196    ///
197    /// # Arguments
198    ///
199    /// * `url` - Target URL
200    /// * `headers` - Optional custom headers
201    /// * `body` - Optional request body as JSON
202    ///
203    /// # Returns
204    ///
205    /// Returns the response body as a JSON `Value`.
206    ///
207    /// # Errors
208    ///
209    /// Returns an error if the request fails.
210    #[instrument(name = "http_put", skip(self, headers, body), fields(url = %url))]
211    pub async fn put(
212        &self,
213        url: &str,
214        headers: Option<HeaderMap>,
215        body: Option<Value>,
216    ) -> Result<Value> {
217        self.fetch(url, Method::PUT, headers, body).await
218    }
219
220    /// Executes a DELETE request.
221    ///
222    /// # Arguments
223    ///
224    /// * `url` - Target URL
225    /// * `headers` - Optional custom headers
226    /// * `body` - Optional request body as JSON
227    ///
228    /// # Returns
229    ///
230    /// Returns the response body as a JSON `Value`.
231    ///
232    /// # Errors
233    ///
234    /// Returns an error if the request fails.
235    #[instrument(name = "http_delete", skip(self, headers, body), fields(url = %url))]
236    pub async fn delete(
237        &self,
238        url: &str,
239        headers: Option<HeaderMap>,
240        body: Option<Value>,
241    ) -> Result<Value> {
242        self.fetch(url, Method::DELETE, headers, body).await
243    }
244}