1use crate::config::ProxyConfig;
21use crate::error::{Error, Result};
22use crate::rate_limiter::RateLimiter;
23use crate::retry_strategy::{RetryConfig, RetryStrategy};
24use reqwest::{Client, Method, Response, StatusCode, header::HeaderMap};
25use serde_json::Value;
26use std::time::Duration;
27use tracing::{debug, error, info, instrument, warn};
28
29#[derive(Debug, Clone)]
31pub struct HttpConfig {
32 pub timeout: Duration,
34 pub connect_timeout: Duration,
36 #[deprecated(note = "Use retry_config instead")]
38 pub max_retries: u32,
39 pub verbose: bool,
41 pub user_agent: String,
43 pub return_response_headers: bool,
45 pub proxy: Option<ProxyConfig>,
47 pub enable_rate_limit: bool,
49 pub retry_config: Option<RetryConfig>,
51}
52
53impl Default for HttpConfig {
54 fn default() -> Self {
55 Self {
56 timeout: Duration::from_secs(30),
57 connect_timeout: Duration::from_secs(10),
58 #[allow(deprecated)]
59 max_retries: 3, verbose: false,
61 user_agent: "ccxt-rust/1.0".to_string(),
62 return_response_headers: false,
63 proxy: None,
64 enable_rate_limit: true,
65 retry_config: None, }
67 }
68}
69
70#[derive(Debug)]
72pub struct HttpClient {
73 client: Client,
74 config: HttpConfig,
75 rate_limiter: Option<RateLimiter>,
76 retry_strategy: RetryStrategy,
77}
78
79impl HttpClient {
80 pub fn new(config: HttpConfig) -> Result<Self> {
96 let mut builder = Client::builder()
97 .timeout(config.timeout)
98 .connect_timeout(config.connect_timeout)
99 .gzip(true)
100 .user_agent(&config.user_agent);
101
102 if let Some(proxy_config) = &config.proxy {
103 let mut proxy = reqwest::Proxy::all(&proxy_config.url)
104 .map_err(|e| Error::network(format!("Invalid proxy URL: {e}")))?;
105
106 if let (Some(username), Some(password)) =
107 (&proxy_config.username, &proxy_config.password)
108 {
109 proxy = proxy.basic_auth(username, password);
110 }
111 builder = builder.proxy(proxy);
112 }
113
114 let client = builder
115 .build()
116 .map_err(|e| Error::network(format!("Failed to build HTTP client: {e}")))?;
117
118 let retry_strategy = RetryStrategy::new(config.retry_config.clone().unwrap_or_default());
119
120 Ok(Self {
121 client,
122 config,
123 rate_limiter: None,
124 retry_strategy,
125 })
126 }
127
128 pub fn new_with_rate_limiter(config: HttpConfig, rate_limiter: RateLimiter) -> Result<Self> {
143 let mut client = Self::new(config)?;
144 client.rate_limiter = Some(rate_limiter);
145 Ok(client)
146 }
147
148 pub fn set_rate_limiter(&mut self, rate_limiter: RateLimiter) {
154 self.rate_limiter = Some(rate_limiter);
155 }
156
157 pub fn set_retry_strategy(&mut self, strategy: RetryStrategy) {
163 self.retry_strategy = strategy;
164 }
165
166 #[instrument(
191 name = "http_fetch",
192 skip(self, headers, body),
193 fields(method = %method, url = %url, timeout_ms = %self.config.timeout.as_millis())
194 )]
195 pub async fn fetch(
196 &self,
197 url: &str,
198 method: Method,
199 headers: Option<HeaderMap>,
200 body: Option<Value>,
201 ) -> Result<Value> {
202 #[allow(clippy::collapsible_if)]
205 if self.config.enable_rate_limit {
206 if let Some(ref limiter) = self.rate_limiter {
207 limiter.wait().await;
208 }
209 }
210
211 let total_timeout = self.config.timeout;
215 let url_for_error = url.to_string();
216
217 match tokio::time::timeout(
218 total_timeout,
219 self.execute_with_retry(|| {
220 let url = url.to_string();
221 let method = method.clone();
222 let headers = headers.clone();
223 let body = body.clone();
224 async move { self.fetch_once(&url, method, headers, body).await }
225 }),
226 )
227 .await
228 {
229 Ok(result) => result,
230 Err(_elapsed) => {
231 warn!(
233 url = %url_for_error,
234 timeout_ms = %total_timeout.as_millis(),
235 "HTTP request timed out (including retries)"
236 );
237 Err(Error::timeout(format!(
238 "Request to {} timed out after {}ms",
239 url_for_error,
240 total_timeout.as_millis()
241 )))
242 }
243 }
244 }
245
246 pub(crate) async fn execute_with_retry<F, Fut>(&self, operation: F) -> Result<Value>
248 where
249 F: Fn() -> Fut,
250 Fut: std::future::Future<Output = Result<Value>>,
251 {
252 let mut attempt = 0;
253 loop {
254 match operation().await {
255 Ok(response) => {
256 debug!(attempt = attempt + 1, "Operation completed successfully");
257 return Ok(response);
258 }
259 Err(e) => {
260 let should_retry = self.retry_strategy.should_retry(&e, attempt);
261
262 if should_retry {
263 let delay = self.retry_strategy.calculate_delay(attempt, &e);
264
265 warn!(
266 attempt = attempt + 1,
267 delay_ms = %delay.as_millis(),
268 error = %e,
269 error_debug = ?e,
270 is_retryable = e.is_retryable(),
271 "Operation failed, retrying after delay"
272 );
273
274 tokio::time::sleep(delay).await;
275 attempt += 1;
276 } else {
277 error!(
279 attempt = attempt + 1,
280 error = %e,
281 error_debug = ?e,
282 is_retryable = e.is_retryable(),
283 "Operation failed, not retrying"
284 );
285 return Err(e);
286 }
287 }
288 }
289 }
290 }
291
292 #[instrument(
294 name = "http_fetch_once",
295 skip(self, headers, body),
296 fields(method = %method, url = %url, has_body = body.is_some())
297 )]
298 async fn fetch_once(
299 &self,
300 url: &str,
301 method: Method,
302 headers: Option<HeaderMap>,
303 body: Option<Value>,
304 ) -> Result<Value> {
305 let mut request = self.client.request(method.clone(), url);
306
307 if let Some(headers) = headers {
308 request = request.headers(headers);
309 }
310
311 if let Some(ref body) = body {
312 request = request.json(&body);
313 }
314
315 if self.config.verbose {
316 if let Some(body) = &body {
317 debug!(
318 body = ?body,
319 "HTTP request with body"
320 );
321 } else {
322 debug!("HTTP request without body");
323 }
324 }
325
326 let response = request.send().await.map_err(|e| {
327 error!(
328 error = %e,
329 "HTTP request send failed"
330 );
331 Error::network(format!("Request failed: {e}"))
332 })?;
333
334 self.process_response(response).await
335 }
336
337 #[instrument(name = "http_process_response", skip(self, response), fields(status))]
339 async fn process_response(&self, response: Response) -> Result<Value> {
340 let status = response.status();
341 let headers = response.headers().clone();
342
343 tracing::Span::current().record("status", status.as_u16());
345
346 let body_text = response.text().await.map_err(|e| {
347 error!(
348 error = %e,
349 "Failed to read response body"
350 );
351 Error::network(format!("Failed to read response body: {e}"))
352 })?;
353
354 let body_preview: String = body_text.chars().take(200).collect();
356 debug!(
357 status = %status,
358 body_length = body_text.len(),
359 body_preview = %body_preview,
360 "HTTP response received"
361 );
362
363 let mut result: Value =
364 serde_json::from_str(&body_text).unwrap_or_else(|_| Value::String(body_text.clone()));
365
366 if self.config.return_response_headers
367 && let Value::Object(ref mut map) = result
368 {
369 let headers_value = headers_to_json(&headers);
370 map.insert("responseHeaders".to_string(), headers_value);
371 }
372
373 if !status.is_success() {
374 let err = Self::handle_http_error(status, &body_text, &result);
375 error!(
376 status = status.as_u16(),
377 error = %err,
378 body_preview = %body_preview,
379 "HTTP error response"
380 );
381 return Err(err);
382 }
383
384 Ok(result)
385 }
386
387 #[instrument(
389 name = "http_handle_error",
390 skip(body, result),
391 fields(status = status.as_u16())
392 )]
393 fn handle_http_error(status: StatusCode, body: &str, result: &Value) -> Error {
394 let body_preview: String = body.chars().take(200).collect();
396
397 match status {
398 StatusCode::BAD_REQUEST => {
399 info!(body_preview = %body_preview, "Bad request error");
400 Error::invalid_request(body.to_string())
401 }
402 StatusCode::UNAUTHORIZED => {
403 warn!("Authentication error: Unauthorized");
404 Error::authentication("Unauthorized")
405 }
406 StatusCode::FORBIDDEN => {
407 warn!("Authentication error: Forbidden");
408 Error::authentication("Forbidden")
409 }
410 StatusCode::NOT_FOUND => {
411 info!("Resource not found");
412 Error::invalid_request("Not found")
413 }
414 StatusCode::TOO_MANY_REQUESTS => {
415 let retry_after = if let Value::Object(map) = result {
416 if let Some(Value::Object(headers)) = map.get("responseHeaders") {
417 headers
418 .get("retry-after")
419 .and_then(|v| v.as_str())
420 .and_then(|s| s.parse::<u64>().ok())
421 } else {
422 None
423 }
424 } else {
425 None
426 };
427
428 if let Some(seconds) = retry_after {
429 warn!(
430 retry_after_seconds = seconds,
431 "Rate limit exceeded with retry-after header"
432 );
433 Error::rate_limit(
434 format!("Rate limit exceeded, retry after {seconds} seconds"),
435 Some(Duration::from_secs(seconds)),
436 )
437 } else {
438 warn!("Rate limit exceeded without retry-after header");
439 Error::rate_limit("Rate limit exceeded, please retry later", None)
440 }
441 }
442 StatusCode::INTERNAL_SERVER_ERROR => {
443 error!(body_preview = %body_preview, "Internal server error");
444 Error::exchange("500", "Internal server error")
445 }
446 StatusCode::SERVICE_UNAVAILABLE => {
447 error!(body_preview = %body_preview, "Service unavailable");
448 Error::exchange("503", "Service unavailable")
449 }
450 StatusCode::GATEWAY_TIMEOUT => {
451 error!("Gateway timeout");
452 Error::from(crate::error::NetworkError::Timeout)
453 }
454 _ => {
455 error!(
456 status = status.as_u16(),
457 body_preview = %body_preview,
458 "Unhandled HTTP error"
459 );
460 Error::network(format!("HTTP {status} error: {body}"))
461 }
462 }
463 }
464
465 #[instrument(name = "http_get", skip(self, headers), fields(url = %url))]
480 pub async fn get(&self, url: &str, headers: Option<HeaderMap>) -> Result<Value> {
481 self.fetch(url, Method::GET, headers, None).await
482 }
483
484 #[instrument(name = "http_post", skip(self, headers, body), fields(url = %url))]
500 pub async fn post(
501 &self,
502 url: &str,
503 headers: Option<HeaderMap>,
504 body: Option<Value>,
505 ) -> Result<Value> {
506 self.fetch(url, Method::POST, headers, body).await
507 }
508
509 #[instrument(name = "http_put", skip(self, headers, body), fields(url = %url))]
525 pub async fn put(
526 &self,
527 url: &str,
528 headers: Option<HeaderMap>,
529 body: Option<Value>,
530 ) -> Result<Value> {
531 self.fetch(url, Method::PUT, headers, body).await
532 }
533
534 #[instrument(name = "http_delete", skip(self, headers, body), fields(url = %url))]
550 pub async fn delete(
551 &self,
552 url: &str,
553 headers: Option<HeaderMap>,
554 body: Option<Value>,
555 ) -> Result<Value> {
556 self.fetch(url, Method::DELETE, headers, body).await
557 }
558
559 pub fn config(&self) -> &HttpConfig {
561 &self.config
562 }
563
564 pub fn set_config(&mut self, config: HttpConfig) {
570 self.config = config;
571 }
572}
573
574fn headers_to_json(headers: &HeaderMap) -> Value {
576 let mut map = serde_json::Map::new();
577 for (key, value) in headers {
578 let key_str = key.as_str().to_string();
579 let value_str = value.to_str().unwrap_or("").to_string();
580 map.insert(key_str, Value::String(value_str));
581 }
582 Value::Object(map)
583}
584
585#[cfg(test)]
586mod tests {
587 use super::*;
588
589 #[tokio::test]
590 async fn test_http_client_creation() {
591 let config = HttpConfig::default();
592 let client = HttpClient::new(config);
593 assert!(client.is_ok());
594 }
595
596 #[tokio::test]
597 async fn test_http_config_default() {
598 let config = HttpConfig::default();
599 assert_eq!(config.timeout, Duration::from_secs(30));
600 assert_eq!(config.connect_timeout, Duration::from_secs(10));
601 assert!(config.retry_config.is_none());
602 assert!(!config.verbose);
603 assert_eq!(config.user_agent, "ccxt-rust/1.0");
604 assert!(config.enable_rate_limit);
605 }
606
607 #[tokio::test]
608 async fn test_headers_to_json() {
609 let mut headers = HeaderMap::new();
610 headers.insert("Content-Type", "application/json".parse().unwrap());
611 headers.insert("X-Custom-Header", "test-value".parse().unwrap());
612
613 let json = headers_to_json(&headers);
614 assert!(json.is_object());
615
616 let obj = json.as_object().unwrap();
617 assert_eq!(obj.get("content-type").unwrap(), "application/json");
618 assert_eq!(obj.get("x-custom-header").unwrap(), "test-value");
619 }
620
621 #[tokio::test]
622 async fn test_http_client_with_proxy() {
623 use crate::config::ProxyConfig;
624
625 let config = HttpConfig {
626 proxy: Some(ProxyConfig::new("http://localhost:8080")),
627 ..Default::default()
628 };
629
630 let client = HttpClient::new(config);
632 assert!(client.is_ok());
633 }
634
635 #[tokio::test]
636 async fn test_get_request() {
637 let config = HttpConfig {
638 verbose: false,
639 ..Default::default()
640 };
641 let client = HttpClient::new(config).unwrap();
642
643 let result = client.get("https://httpbin.org/get", None).await;
645
646 match result {
649 Ok(value) => {
650 assert!(value.is_object());
651 }
652 Err(e) => {
653 warn!("Network test skipped due to: {:?}", e);
655 }
656 }
657 }
658
659 #[tokio::test]
660 async fn test_post_request() {
661 let config = HttpConfig::default();
662 let client = HttpClient::new(config).unwrap();
663
664 let body = serde_json::json!({
665 "test": "data",
666 "number": 123
667 });
668
669 let result = client
670 .post("https://httpbin.org/post", None, Some(body))
671 .await;
672
673 match result {
674 Ok(value) => {
675 assert!(value.is_object());
676 }
677 Err(e) => {
678 warn!("Network test skipped due to: {:?}", e);
679 }
680 }
681 }
682
683 #[tokio::test]
684 async fn test_http_error_handling() {
685 let config = HttpConfig::default();
686 let client = HttpClient::new(config).unwrap();
687
688 let result = client.get("https://httpbin.org/status/404", None).await;
690 assert!(result.is_err());
691
692 if let Err(e) = result {
693 match e {
694 Error::InvalidRequest(_) => {
695 }
697 Error::Exchange(_) => {
698 }
700 Error::Network(_) => {
701 }
703 _ => panic!("Unexpected error type: {:?}", e),
704 }
705 }
706 }
707
708 #[tokio::test]
709 async fn test_timeout() {
710 let config = HttpConfig {
711 timeout: Duration::from_secs(1), retry_config: Some(RetryConfig {
713 max_retries: 0,
714 ..RetryConfig::default()
715 }),
716 ..Default::default()
717 };
718 let client = HttpClient::new(config).unwrap();
719
720 let result = client.get("https://httpbin.org/delay/5", None).await;
722 assert!(result.is_err());
723
724 if let Err(e) = result {
725 match &e {
726 Error::Timeout(msg) => {
727 assert!(
729 msg.contains("httpbin.org/delay/5"),
730 "Timeout error should contain URL"
731 );
732 assert!(
733 msg.contains("1000ms"),
734 "Timeout error should contain timeout duration"
735 );
736 }
737 Error::Network(_) => {
738 }
740 _ => panic!("Expected Timeout or Network error, got: {:?}", e),
741 }
742 assert!(e.is_retryable(), "Timeout error should be retryable");
744 }
745 }
746
747 #[tokio::test]
748 async fn test_retry_mechanism() {
749 let config = HttpConfig {
750 retry_config: Some(RetryConfig {
751 max_retries: 2,
752 ..RetryConfig::default()
753 }),
754 verbose: true,
755 ..Default::default()
756 };
757 let client = HttpClient::new(config).unwrap();
758
759 let result = client.get("https://httpbin.org/status/503", None).await;
762
763 assert!(result.is_err());
765 }
766
767 #[tokio::test]
768 async fn test_rate_limiter_integration() {
769 use crate::rate_limiter::{RateLimiter, RateLimiterConfig};
770 use std::time::{Duration, Instant};
771
772 let limiter_config = RateLimiterConfig::new(5, Duration::from_secs(1));
774 let limiter = RateLimiter::new(limiter_config);
775
776 let config = HttpConfig {
777 enable_rate_limit: true,
778 verbose: false,
779 ..Default::default()
780 };
781
782 let client = HttpClient::new_with_rate_limiter(config, limiter).unwrap();
783
784 let start = Instant::now();
786
787 for _ in 0..10 {
789 let _ = client.get("https://httpbin.org/get", None).await;
790 }
791
792 let elapsed = start.elapsed();
793
794 assert!(
797 elapsed >= Duration::from_secs(1),
798 "Rate limiter should have delayed requests"
799 );
800 }
801
802 #[tokio::test]
803 async fn test_rate_limiter_disabled() {
804 let config = HttpConfig {
805 enable_rate_limit: false,
806 verbose: false,
807 ..Default::default()
808 };
809
810 let client = HttpClient::new(config).unwrap();
811
812 match client.get("https://httpbin.org/get", None).await {
814 Ok(_) => assert!(true),
815 Err(e) => {
816 warn!("Network test skipped due to: {:?}", e);
818 }
819 }
820 }
821
822 #[tokio::test]
823 async fn test_execute_with_retry_success() {
824 let config = HttpConfig::default();
825 let client = HttpClient::new(config).unwrap();
826
827 let result = client
828 .execute_with_retry(|| async { Ok(serde_json::json!({"status": "ok"})) })
829 .await;
830
831 assert!(result.is_ok());
832 assert_eq!(result.unwrap()["status"], "ok");
833 }
834
835 #[tokio::test]
836 async fn test_execute_with_retry_failure() {
837 let config = HttpConfig {
838 retry_config: Some(RetryConfig {
839 max_retries: 2,
840 base_delay_ms: 10,
841 ..RetryConfig::default()
842 }),
843 ..Default::default()
844 };
845 let client = HttpClient::new(config).unwrap();
846
847 let result = client
848 .execute_with_retry(|| async { Err(Error::network("Persistent failure")) })
849 .await;
850
851 assert!(result.is_err());
852 }
853}