ccxt_core/
http_client.rs

1//! HTTP client abstraction layer
2//!
3//! Provides a unified HTTP request interface with support for:
4//! - Automatic retry mechanism with configurable strategies
5//! - Timeout control per request
6//! - Gzip compression/decompression
7//! - Request and response logging with structured tracing
8//! - Custom headers
9//! - Proxy configuration
10//! - Rate limiting integration
11//!
12//! # Observability
13//!
14//! This module uses the `tracing` crate for structured logging. Key events:
15//! - HTTP request initiation with URL and method
16//! - Retry attempts with delay and error cause
17//! - HTTP response status and body preview
18//! - Error details with structured fields
19
20use crate::error::{Error, Result};
21use crate::rate_limiter::RateLimiter;
22use crate::retry_strategy::{RetryConfig, RetryStrategy};
23use reqwest::{Client, Method, Response, StatusCode, header::HeaderMap};
24use serde_json::Value;
25use std::time::Duration;
26use tracing::{debug, error, info, instrument, warn};
27
28/// HTTP request configuration
29#[derive(Debug, Clone)]
30pub struct HttpConfig {
31    /// Request timeout in seconds
32    pub timeout: u64,
33    /// Maximum retry attempts (deprecated, use `retry_config` instead)
34    #[deprecated(note = "Use retry_config instead")]
35    pub max_retries: u32,
36    /// Whether to enable verbose logging
37    pub verbose: bool,
38    /// Default User-Agent header value
39    pub user_agent: String,
40    /// Whether to include response headers in the result
41    pub return_response_headers: bool,
42    /// Optional proxy URL
43    pub proxy: Option<String>,
44    /// Whether to enable rate limiting
45    pub enable_rate_limit: bool,
46    /// Optional retry configuration (uses default if `None`)
47    pub retry_config: Option<RetryConfig>,
48}
49
50impl Default for HttpConfig {
51    fn default() -> Self {
52        Self {
53            timeout: 30,
54            #[allow(deprecated)]
55            max_retries: 3, // Kept for backward compatibility, but deprecated
56            verbose: false,
57            user_agent: "ccxt-rust/1.0".to_string(),
58            return_response_headers: false,
59            proxy: None,
60            enable_rate_limit: true,
61            retry_config: None, // Uses default retry configuration
62        }
63    }
64}
65
66/// HTTP client with retry and rate limiting support
67#[derive(Debug)]
68pub struct HttpClient {
69    client: Client,
70    config: HttpConfig,
71    rate_limiter: Option<RateLimiter>,
72    retry_strategy: RetryStrategy,
73}
74
75impl HttpClient {
76    /// Creates a new HTTP client with the given configuration.
77    ///
78    /// # Arguments
79    ///
80    /// * `config` - HTTP client configuration
81    ///
82    /// # Returns
83    ///
84    /// Returns a `Result` containing the initialized client or an error if client creation fails.
85    ///
86    /// # Errors
87    ///
88    /// Returns an error if:
89    /// - The proxy URL is invalid
90    /// - The HTTP client cannot be built
91    pub fn new(config: HttpConfig) -> Result<Self> {
92        let mut builder = Client::builder()
93            .timeout(Duration::from_secs(config.timeout))
94            .gzip(true)
95            .user_agent(&config.user_agent);
96
97        if let Some(proxy_url) = &config.proxy {
98            let proxy = reqwest::Proxy::all(proxy_url)
99                .map_err(|e| Error::network(format!("Invalid proxy URL: {}", e)))?;
100            builder = builder.proxy(proxy);
101        }
102
103        let client = builder
104            .build()
105            .map_err(|e| Error::network(format!("Failed to build HTTP client: {}", e)))?;
106
107        let retry_strategy = RetryStrategy::new(config.retry_config.clone().unwrap_or_default());
108
109        Ok(Self {
110            client,
111            config,
112            rate_limiter: None,
113            retry_strategy,
114        })
115    }
116
117    /// Creates a new HTTP client with a custom rate limiter.
118    ///
119    /// # Arguments
120    ///
121    /// * `config` - HTTP client configuration
122    /// * `rate_limiter` - Pre-configured rate limiter instance
123    ///
124    /// # Returns
125    ///
126    /// Returns a `Result` containing the initialized client with rate limiter.
127    ///
128    /// # Errors
129    ///
130    /// Returns an error if client creation fails.
131    pub fn new_with_rate_limiter(config: HttpConfig, rate_limiter: RateLimiter) -> Result<Self> {
132        let mut client = Self::new(config)?;
133        client.rate_limiter = Some(rate_limiter);
134        Ok(client)
135    }
136
137    /// Sets the rate limiter for this client.
138    ///
139    /// # Arguments
140    ///
141    /// * `rate_limiter` - Rate limiter instance to use
142    pub fn set_rate_limiter(&mut self, rate_limiter: RateLimiter) {
143        self.rate_limiter = Some(rate_limiter);
144    }
145
146    /// Sets the retry strategy for this client.
147    ///
148    /// # Arguments
149    ///
150    /// * `strategy` - Retry strategy to use for failed requests
151    pub fn set_retry_strategy(&mut self, strategy: RetryStrategy) {
152        self.retry_strategy = strategy;
153    }
154
155    /// Executes an HTTP request with automatic retry mechanism.
156    ///
157    /// # Arguments
158    ///
159    /// * `url` - Target URL for the request
160    /// * `method` - HTTP method to use
161    /// * `headers` - Optional custom headers
162    /// * `body` - Optional request body as JSON
163    ///
164    /// # Returns
165    ///
166    /// Returns the response body as a JSON `Value`.
167    ///
168    /// # Errors
169    ///
170    /// Returns an error if:
171    /// - All retry attempts fail
172    /// - The server returns an error status code
173    /// - Network communication fails
174    #[instrument(
175        name = "http_fetch",
176        skip(self, headers, body),
177        fields(method = %method, url = %url)
178    )]
179    pub async fn fetch(
180        &self,
181        url: &str,
182        method: Method,
183        headers: Option<HeaderMap>,
184        body: Option<Value>,
185    ) -> Result<Value> {
186        if self.config.enable_rate_limit {
187            if let Some(ref limiter) = self.rate_limiter {
188                limiter.wait().await;
189            }
190        }
191
192        let mut attempt = 0;
193        loop {
194            match self
195                .fetch_once(url, method.clone(), headers.clone(), body.clone())
196                .await
197            {
198                Ok(response) => {
199                    debug!(attempt = attempt + 1, "HTTP request completed successfully");
200                    return Ok(response);
201                }
202                Err(e) => {
203                    let should_retry = self.retry_strategy.should_retry(&e, attempt);
204
205                    if should_retry {
206                        let delay = self.retry_strategy.calculate_delay(attempt, &e);
207
208                        warn!(
209                            attempt = attempt + 1,
210                            delay_ms = %delay.as_millis(),
211                            error = %e,
212                            error_debug = ?e,
213                            is_retryable = e.is_retryable(),
214                            "HTTP request failed, retrying after delay"
215                        );
216
217                        tokio::time::sleep(delay).await;
218                        attempt += 1;
219                    } else {
220                        // Not retrying, log the final error and return
221                        error!(
222                            attempt = attempt + 1,
223                            error = %e,
224                            error_debug = ?e,
225                            is_retryable = e.is_retryable(),
226                            "HTTP request failed, not retrying"
227                        );
228                        return Err(e);
229                    }
230                }
231            }
232        }
233    }
234
235    /// Executes a single HTTP request without retry logic (internal method).
236    #[instrument(
237        name = "http_fetch_once",
238        skip(self, headers, body),
239        fields(method = %method, url = %url, has_body = body.is_some())
240    )]
241    async fn fetch_once(
242        &self,
243        url: &str,
244        method: Method,
245        headers: Option<HeaderMap>,
246        body: Option<Value>,
247    ) -> Result<Value> {
248        let mut request = self.client.request(method.clone(), url);
249
250        if let Some(headers) = headers {
251            request = request.headers(headers);
252        }
253
254        if let Some(ref body) = body {
255            request = request.json(&body);
256        }
257
258        if self.config.verbose {
259            if let Some(body) = &body {
260                debug!(
261                    body = ?body,
262                    "HTTP request with body"
263                );
264            } else {
265                debug!("HTTP request without body");
266            }
267        }
268
269        let response = request.send().await.map_err(|e| {
270            error!(
271                error = %e,
272                "HTTP request send failed"
273            );
274            Error::network(format!("Request failed: {}", e))
275        })?;
276
277        self.process_response(response).await
278    }
279
280    /// Processes an HTTP response, handling errors and parsing JSON.
281    #[instrument(name = "http_process_response", skip(self, response), fields(status))]
282    async fn process_response(&self, response: Response) -> Result<Value> {
283        let status = response.status();
284        let headers = response.headers().clone();
285
286        // Record the status in the span
287        tracing::Span::current().record("status", status.as_u16());
288
289        let body_text = response.text().await.map_err(|e| {
290            error!(
291                error = %e,
292                "Failed to read response body"
293            );
294            Error::network(format!("Failed to read response body: {}", e))
295        })?;
296
297        // Log response details (truncate body preview for large responses)
298        let body_preview: String = body_text.chars().take(200).collect();
299        debug!(
300            status = %status,
301            body_length = body_text.len(),
302            body_preview = %body_preview,
303            "HTTP response received"
304        );
305
306        let mut result: Value =
307            serde_json::from_str(&body_text).unwrap_or_else(|_| Value::String(body_text.clone()));
308
309        if self.config.return_response_headers {
310            if let Value::Object(ref mut map) = result {
311                let headers_value = headers_to_json(&headers);
312                map.insert("responseHeaders".to_string(), headers_value);
313            }
314        }
315
316        if !status.is_success() {
317            let err = self.handle_http_error(status, &body_text, result);
318            error!(
319                status = status.as_u16(),
320                error = %err,
321                body_preview = %body_preview,
322                "HTTP error response"
323            );
324            return Err(err);
325        }
326
327        Ok(result)
328    }
329
330    /// Handles HTTP error status codes and converts them to appropriate errors.
331    #[instrument(
332        name = "http_handle_error",
333        skip(self, body, result),
334        fields(status = status.as_u16())
335    )]
336    fn handle_http_error(&self, status: StatusCode, body: &str, result: Value) -> Error {
337        // Truncate body for logging to avoid excessive log sizes
338        let body_preview: String = body.chars().take(200).collect();
339
340        match status {
341            StatusCode::BAD_REQUEST => {
342                info!(body_preview = %body_preview, "Bad request error");
343                Error::invalid_request(body.to_string())
344            }
345            StatusCode::UNAUTHORIZED => {
346                warn!("Authentication error: Unauthorized");
347                Error::authentication("Unauthorized")
348            }
349            StatusCode::FORBIDDEN => {
350                warn!("Authentication error: Forbidden");
351                Error::authentication("Forbidden")
352            }
353            StatusCode::NOT_FOUND => {
354                info!("Resource not found");
355                Error::invalid_request("Not found")
356            }
357            StatusCode::TOO_MANY_REQUESTS => {
358                let retry_after = if let Value::Object(ref map) = result {
359                    if let Some(Value::Object(headers)) = map.get("responseHeaders") {
360                        headers
361                            .get("retry-after")
362                            .and_then(|v| v.as_str())
363                            .and_then(|s| s.parse::<u64>().ok())
364                    } else {
365                        None
366                    }
367                } else {
368                    None
369                };
370
371                if let Some(seconds) = retry_after {
372                    warn!(
373                        retry_after_seconds = seconds,
374                        "Rate limit exceeded with retry-after header"
375                    );
376                    Error::rate_limit(
377                        format!("Rate limit exceeded, retry after {} seconds", seconds),
378                        Some(Duration::from_secs(seconds)),
379                    )
380                } else {
381                    warn!("Rate limit exceeded without retry-after header");
382                    Error::rate_limit("Rate limit exceeded, please retry later", None)
383                }
384            }
385            StatusCode::INTERNAL_SERVER_ERROR => {
386                error!(body_preview = %body_preview, "Internal server error");
387                Error::exchange("500", "Internal server error")
388            }
389            StatusCode::SERVICE_UNAVAILABLE => {
390                error!(body_preview = %body_preview, "Service unavailable");
391                Error::exchange("503", "Service unavailable")
392            }
393            StatusCode::GATEWAY_TIMEOUT => {
394                error!("Gateway timeout");
395                Error::from(crate::error::NetworkError::Timeout)
396            }
397            _ => {
398                error!(
399                    status = status.as_u16(),
400                    body_preview = %body_preview,
401                    "Unhandled HTTP error"
402                );
403                Error::network(format!("HTTP {} error: {}", status, body))
404            }
405        }
406    }
407
408    /// Executes a GET request.
409    ///
410    /// # Arguments
411    ///
412    /// * `url` - Target URL
413    /// * `headers` - Optional custom headers
414    ///
415    /// # Returns
416    ///
417    /// Returns the response body as a JSON `Value`.
418    ///
419    /// # Errors
420    ///
421    /// Returns an error if the request fails.
422    #[instrument(name = "http_get", skip(self, headers), fields(url = %url))]
423    pub async fn get(&self, url: &str, headers: Option<HeaderMap>) -> Result<Value> {
424        self.fetch(url, Method::GET, headers, None).await
425    }
426
427    /// Executes a POST request.
428    ///
429    /// # Arguments
430    ///
431    /// * `url` - Target URL
432    /// * `headers` - Optional custom headers
433    /// * `body` - Optional request body as JSON
434    ///
435    /// # Returns
436    ///
437    /// Returns the response body as a JSON `Value`.
438    ///
439    /// # Errors
440    ///
441    /// Returns an error if the request fails.
442    #[instrument(name = "http_post", skip(self, headers, body), fields(url = %url))]
443    pub async fn post(
444        &self,
445        url: &str,
446        headers: Option<HeaderMap>,
447        body: Option<Value>,
448    ) -> Result<Value> {
449        self.fetch(url, Method::POST, headers, body).await
450    }
451
452    /// Executes a PUT request.
453    ///
454    /// # Arguments
455    ///
456    /// * `url` - Target URL
457    /// * `headers` - Optional custom headers
458    /// * `body` - Optional request body as JSON
459    ///
460    /// # Returns
461    ///
462    /// Returns the response body as a JSON `Value`.
463    ///
464    /// # Errors
465    ///
466    /// Returns an error if the request fails.
467    #[instrument(name = "http_put", skip(self, headers, body), fields(url = %url))]
468    pub async fn put(
469        &self,
470        url: &str,
471        headers: Option<HeaderMap>,
472        body: Option<Value>,
473    ) -> Result<Value> {
474        self.fetch(url, Method::PUT, headers, body).await
475    }
476
477    /// Executes a DELETE request.
478    ///
479    /// # Arguments
480    ///
481    /// * `url` - Target URL
482    /// * `headers` - Optional custom headers
483    /// * `body` - Optional request body as JSON
484    ///
485    /// # Returns
486    ///
487    /// Returns the response body as a JSON `Value`.
488    ///
489    /// # Errors
490    ///
491    /// Returns an error if the request fails.
492    #[instrument(name = "http_delete", skip(self, headers, body), fields(url = %url))]
493    pub async fn delete(
494        &self,
495        url: &str,
496        headers: Option<HeaderMap>,
497        body: Option<Value>,
498    ) -> Result<Value> {
499        self.fetch(url, Method::DELETE, headers, body).await
500    }
501
502    /// Returns a reference to the current HTTP configuration.
503    pub fn config(&self) -> &HttpConfig {
504        &self.config
505    }
506
507    /// Updates the HTTP configuration.
508    ///
509    /// # Arguments
510    ///
511    /// * `config` - New configuration to use
512    pub fn set_config(&mut self, config: HttpConfig) {
513        self.config = config;
514    }
515}
516
517/// Converts a `HeaderMap` to a JSON `Value`.
518fn headers_to_json(headers: &HeaderMap) -> Value {
519    let mut map = serde_json::Map::new();
520    for (key, value) in headers.iter() {
521        let key_str = key.as_str().to_string();
522        let value_str = value.to_str().unwrap_or("").to_string();
523        map.insert(key_str, Value::String(value_str));
524    }
525    Value::Object(map)
526}
527
528#[cfg(test)]
529mod tests {
530    use super::*;
531
532    #[tokio::test]
533    async fn test_http_client_creation() {
534        let config = HttpConfig::default();
535        let client = HttpClient::new(config);
536        assert!(client.is_ok());
537    }
538
539    #[tokio::test]
540    async fn test_http_config_default() {
541        let config = HttpConfig::default();
542        assert_eq!(config.timeout, 30);
543        assert!(config.retry_config.is_none());
544        assert!(!config.verbose);
545        assert_eq!(config.user_agent, "ccxt-rust/1.0");
546        assert!(config.enable_rate_limit);
547    }
548
549    #[tokio::test]
550    async fn test_headers_to_json() {
551        let mut headers = HeaderMap::new();
552        headers.insert("Content-Type", "application/json".parse().unwrap());
553        headers.insert("X-Custom-Header", "test-value".parse().unwrap());
554
555        let json = headers_to_json(&headers);
556        assert!(json.is_object());
557
558        let obj = json.as_object().unwrap();
559        assert_eq!(obj.get("content-type").unwrap(), "application/json");
560        assert_eq!(obj.get("x-custom-header").unwrap(), "test-value");
561    }
562
563    #[tokio::test]
564    async fn test_http_client_with_proxy() {
565        let config = HttpConfig {
566            proxy: Some("http://localhost:8080".to_string()),
567            ..Default::default()
568        };
569
570        // Note: This test may fail if no actual proxy server is available
571        let client = HttpClient::new(config);
572        assert!(client.is_ok());
573    }
574
575    #[tokio::test]
576    async fn test_get_request() {
577        let config = HttpConfig {
578            verbose: false,
579            ..Default::default()
580        };
581        let client = HttpClient::new(config).unwrap();
582
583        // Test using a public API (httpbin.org)
584        let result = client.get("https://httpbin.org/get", None).await;
585
586        // Since this depends on external service, we only check if result is not an error
587        // In production, you may want to mock this request
588        match result {
589            Ok(value) => {
590                assert!(value.is_object());
591            }
592            Err(e) => {
593                // Network errors are acceptable (e.g., in CI environments)
594                warn!("Network test skipped due to: {:?}", e);
595            }
596        }
597    }
598
599    #[tokio::test]
600    async fn test_post_request() {
601        let config = HttpConfig::default();
602        let client = HttpClient::new(config).unwrap();
603
604        let body = serde_json::json!({
605            "test": "data",
606            "number": 123
607        });
608
609        let result = client
610            .post("https://httpbin.org/post", None, Some(body))
611            .await;
612
613        match result {
614            Ok(value) => {
615                assert!(value.is_object());
616            }
617            Err(e) => {
618                warn!("Network test skipped due to: {:?}", e);
619            }
620        }
621    }
622
623    #[tokio::test]
624    async fn test_http_error_handling() {
625        let config = HttpConfig::default();
626        let client = HttpClient::new(config).unwrap();
627
628        // Test 404 error
629        let result = client.get("https://httpbin.org/status/404", None).await;
630        assert!(result.is_err());
631
632        if let Err(e) = result {
633            match e {
634                Error::InvalidRequest(_) => {
635                    // Expected error type for 404
636                }
637                Error::Exchange(_) => {
638                    // May get 5xx errors from httpbin.org when service is unstable
639                }
640                Error::Network(_) => {
641                    // May get network errors when service is unavailable
642                }
643                _ => panic!("Unexpected error type: {:?}", e),
644            }
645        }
646    }
647
648    #[tokio::test]
649    async fn test_timeout() {
650        let config = HttpConfig {
651            timeout: 1, // 1 second timeout
652            retry_config: Some(RetryConfig {
653                max_retries: 0,
654                ..RetryConfig::default()
655            }),
656            ..Default::default()
657        };
658        let client = HttpClient::new(config).unwrap();
659
660        // Use an endpoint that will delay
661        let result = client.get("https://httpbin.org/delay/5", None).await;
662        assert!(result.is_err());
663
664        if let Err(e) = result {
665            match e {
666                Error::Network(_) => {
667                    // Expected timeout error
668                }
669                _ => panic!("Expected Network error, got: {:?}", e),
670            }
671        }
672    }
673
674    #[tokio::test]
675    async fn test_retry_mechanism() {
676        let config = HttpConfig {
677            retry_config: Some(RetryConfig {
678                max_retries: 2,
679                ..RetryConfig::default()
680            }),
681            verbose: true,
682            ..Default::default()
683        };
684        let client = HttpClient::new(config).unwrap();
685
686        // Test retry mechanism (using a potentially unstable endpoint)
687        // Note: This test depends on external service
688        let result = client.get("https://httpbin.org/status/503", None).await;
689
690        // Should still fail after retries
691        assert!(result.is_err());
692    }
693
694    #[tokio::test]
695    async fn test_rate_limiter_integration() {
696        use crate::rate_limiter::{RateLimiter, RateLimiterConfig};
697        use std::time::{Duration, Instant};
698
699        // Create rate limiter: 5 requests per second
700        let limiter_config = RateLimiterConfig::new(5, Duration::from_secs(1));
701        let limiter = RateLimiter::new(limiter_config);
702
703        let config = HttpConfig {
704            enable_rate_limit: true,
705            verbose: false,
706            ..Default::default()
707        };
708
709        let client = HttpClient::new_with_rate_limiter(config, limiter).unwrap();
710
711        // Test if rate limiting is effective
712        let start = Instant::now();
713
714        // Send 10 requests rapidly (should be rate limited)
715        for _ in 0..10 {
716            let _ = client.get("https://httpbin.org/get", None).await;
717        }
718
719        let elapsed = start.elapsed();
720
721        // 10 requests at 5 req/sec should take at least 2 seconds
722        // But due to network latency, we only check if it exceeds 1 second
723        assert!(
724            elapsed >= Duration::from_secs(1),
725            "Rate limiter should have delayed requests"
726        );
727    }
728
729    #[tokio::test]
730    async fn test_rate_limiter_disabled() {
731        let config = HttpConfig {
732            enable_rate_limit: false,
733            verbose: false,
734            ..Default::default()
735        };
736
737        let client = HttpClient::new(config).unwrap();
738
739        // Client should work normally when rate limiting is disabled
740        match client.get("https://httpbin.org/get", None).await {
741            Ok(_) => assert!(true),
742            Err(e) => {
743                // Network errors are acceptable
744                warn!("Network test skipped due to: {:?}", e);
745            }
746        }
747    }
748}