1use crate::error::{Error, Result};
21use crate::rate_limiter::RateLimiter;
22use crate::retry_strategy::{RetryConfig, RetryStrategy};
23use reqwest::{Client, Method, Response, StatusCode, header::HeaderMap};
24use serde_json::Value;
25use std::time::Duration;
26use tracing::{debug, error, info, instrument, warn};
27
28#[derive(Debug, Clone)]
30pub struct HttpConfig {
31 pub timeout: u64,
33 #[deprecated(note = "Use retry_config instead")]
35 pub max_retries: u32,
36 pub verbose: bool,
38 pub user_agent: String,
40 pub return_response_headers: bool,
42 pub proxy: Option<String>,
44 pub enable_rate_limit: bool,
46 pub retry_config: Option<RetryConfig>,
48}
49
50impl Default for HttpConfig {
51 fn default() -> Self {
52 Self {
53 timeout: 30,
54 #[allow(deprecated)]
55 max_retries: 3, verbose: false,
57 user_agent: "ccxt-rust/1.0".to_string(),
58 return_response_headers: false,
59 proxy: None,
60 enable_rate_limit: true,
61 retry_config: None, }
63 }
64}
65
66#[derive(Debug)]
68pub struct HttpClient {
69 client: Client,
70 config: HttpConfig,
71 rate_limiter: Option<RateLimiter>,
72 retry_strategy: RetryStrategy,
73}
74
75impl HttpClient {
76 pub fn new(config: HttpConfig) -> Result<Self> {
92 let mut builder = Client::builder()
93 .timeout(Duration::from_secs(config.timeout))
94 .gzip(true)
95 .user_agent(&config.user_agent);
96
97 if let Some(proxy_url) = &config.proxy {
98 let proxy = reqwest::Proxy::all(proxy_url)
99 .map_err(|e| Error::network(format!("Invalid proxy URL: {}", e)))?;
100 builder = builder.proxy(proxy);
101 }
102
103 let client = builder
104 .build()
105 .map_err(|e| Error::network(format!("Failed to build HTTP client: {}", e)))?;
106
107 let retry_strategy = RetryStrategy::new(config.retry_config.clone().unwrap_or_default());
108
109 Ok(Self {
110 client,
111 config,
112 rate_limiter: None,
113 retry_strategy,
114 })
115 }
116
117 pub fn new_with_rate_limiter(config: HttpConfig, rate_limiter: RateLimiter) -> Result<Self> {
132 let mut client = Self::new(config)?;
133 client.rate_limiter = Some(rate_limiter);
134 Ok(client)
135 }
136
137 pub fn set_rate_limiter(&mut self, rate_limiter: RateLimiter) {
143 self.rate_limiter = Some(rate_limiter);
144 }
145
146 pub fn set_retry_strategy(&mut self, strategy: RetryStrategy) {
152 self.retry_strategy = strategy;
153 }
154
155 #[instrument(
175 name = "http_fetch",
176 skip(self, headers, body),
177 fields(method = %method, url = %url)
178 )]
179 pub async fn fetch(
180 &self,
181 url: &str,
182 method: Method,
183 headers: Option<HeaderMap>,
184 body: Option<Value>,
185 ) -> Result<Value> {
186 if self.config.enable_rate_limit {
187 if let Some(ref limiter) = self.rate_limiter {
188 limiter.wait().await;
189 }
190 }
191
192 let mut attempt = 0;
193 loop {
194 match self
195 .fetch_once(url, method.clone(), headers.clone(), body.clone())
196 .await
197 {
198 Ok(response) => {
199 debug!(attempt = attempt + 1, "HTTP request completed successfully");
200 return Ok(response);
201 }
202 Err(e) => {
203 let should_retry = self.retry_strategy.should_retry(&e, attempt);
204
205 if should_retry {
206 let delay = self.retry_strategy.calculate_delay(attempt, &e);
207
208 warn!(
209 attempt = attempt + 1,
210 delay_ms = %delay.as_millis(),
211 error = %e,
212 error_debug = ?e,
213 is_retryable = e.is_retryable(),
214 "HTTP request failed, retrying after delay"
215 );
216
217 tokio::time::sleep(delay).await;
218 attempt += 1;
219 } else {
220 error!(
222 attempt = attempt + 1,
223 error = %e,
224 error_debug = ?e,
225 is_retryable = e.is_retryable(),
226 "HTTP request failed, not retrying"
227 );
228 return Err(e);
229 }
230 }
231 }
232 }
233 }
234
235 #[instrument(
237 name = "http_fetch_once",
238 skip(self, headers, body),
239 fields(method = %method, url = %url, has_body = body.is_some())
240 )]
241 async fn fetch_once(
242 &self,
243 url: &str,
244 method: Method,
245 headers: Option<HeaderMap>,
246 body: Option<Value>,
247 ) -> Result<Value> {
248 let mut request = self.client.request(method.clone(), url);
249
250 if let Some(headers) = headers {
251 request = request.headers(headers);
252 }
253
254 if let Some(ref body) = body {
255 request = request.json(&body);
256 }
257
258 if self.config.verbose {
259 if let Some(body) = &body {
260 debug!(
261 body = ?body,
262 "HTTP request with body"
263 );
264 } else {
265 debug!("HTTP request without body");
266 }
267 }
268
269 let response = request.send().await.map_err(|e| {
270 error!(
271 error = %e,
272 "HTTP request send failed"
273 );
274 Error::network(format!("Request failed: {}", e))
275 })?;
276
277 self.process_response(response).await
278 }
279
280 #[instrument(name = "http_process_response", skip(self, response), fields(status))]
282 async fn process_response(&self, response: Response) -> Result<Value> {
283 let status = response.status();
284 let headers = response.headers().clone();
285
286 tracing::Span::current().record("status", status.as_u16());
288
289 let body_text = response.text().await.map_err(|e| {
290 error!(
291 error = %e,
292 "Failed to read response body"
293 );
294 Error::network(format!("Failed to read response body: {}", e))
295 })?;
296
297 let body_preview: String = body_text.chars().take(200).collect();
299 debug!(
300 status = %status,
301 body_length = body_text.len(),
302 body_preview = %body_preview,
303 "HTTP response received"
304 );
305
306 let mut result: Value =
307 serde_json::from_str(&body_text).unwrap_or_else(|_| Value::String(body_text.clone()));
308
309 if self.config.return_response_headers {
310 if let Value::Object(ref mut map) = result {
311 let headers_value = headers_to_json(&headers);
312 map.insert("responseHeaders".to_string(), headers_value);
313 }
314 }
315
316 if !status.is_success() {
317 let err = self.handle_http_error(status, &body_text, result);
318 error!(
319 status = status.as_u16(),
320 error = %err,
321 body_preview = %body_preview,
322 "HTTP error response"
323 );
324 return Err(err);
325 }
326
327 Ok(result)
328 }
329
330 #[instrument(
332 name = "http_handle_error",
333 skip(self, body, result),
334 fields(status = status.as_u16())
335 )]
336 fn handle_http_error(&self, status: StatusCode, body: &str, result: Value) -> Error {
337 let body_preview: String = body.chars().take(200).collect();
339
340 match status {
341 StatusCode::BAD_REQUEST => {
342 info!(body_preview = %body_preview, "Bad request error");
343 Error::invalid_request(body.to_string())
344 }
345 StatusCode::UNAUTHORIZED => {
346 warn!("Authentication error: Unauthorized");
347 Error::authentication("Unauthorized")
348 }
349 StatusCode::FORBIDDEN => {
350 warn!("Authentication error: Forbidden");
351 Error::authentication("Forbidden")
352 }
353 StatusCode::NOT_FOUND => {
354 info!("Resource not found");
355 Error::invalid_request("Not found")
356 }
357 StatusCode::TOO_MANY_REQUESTS => {
358 let retry_after = if let Value::Object(ref map) = result {
359 if let Some(Value::Object(headers)) = map.get("responseHeaders") {
360 headers
361 .get("retry-after")
362 .and_then(|v| v.as_str())
363 .and_then(|s| s.parse::<u64>().ok())
364 } else {
365 None
366 }
367 } else {
368 None
369 };
370
371 if let Some(seconds) = retry_after {
372 warn!(
373 retry_after_seconds = seconds,
374 "Rate limit exceeded with retry-after header"
375 );
376 Error::rate_limit(
377 format!("Rate limit exceeded, retry after {} seconds", seconds),
378 Some(Duration::from_secs(seconds)),
379 )
380 } else {
381 warn!("Rate limit exceeded without retry-after header");
382 Error::rate_limit("Rate limit exceeded, please retry later", None)
383 }
384 }
385 StatusCode::INTERNAL_SERVER_ERROR => {
386 error!(body_preview = %body_preview, "Internal server error");
387 Error::exchange("500", "Internal server error")
388 }
389 StatusCode::SERVICE_UNAVAILABLE => {
390 error!(body_preview = %body_preview, "Service unavailable");
391 Error::exchange("503", "Service unavailable")
392 }
393 StatusCode::GATEWAY_TIMEOUT => {
394 error!("Gateway timeout");
395 Error::from(crate::error::NetworkError::Timeout)
396 }
397 _ => {
398 error!(
399 status = status.as_u16(),
400 body_preview = %body_preview,
401 "Unhandled HTTP error"
402 );
403 Error::network(format!("HTTP {} error: {}", status, body))
404 }
405 }
406 }
407
408 #[instrument(name = "http_get", skip(self, headers), fields(url = %url))]
423 pub async fn get(&self, url: &str, headers: Option<HeaderMap>) -> Result<Value> {
424 self.fetch(url, Method::GET, headers, None).await
425 }
426
427 #[instrument(name = "http_post", skip(self, headers, body), fields(url = %url))]
443 pub async fn post(
444 &self,
445 url: &str,
446 headers: Option<HeaderMap>,
447 body: Option<Value>,
448 ) -> Result<Value> {
449 self.fetch(url, Method::POST, headers, body).await
450 }
451
452 #[instrument(name = "http_put", skip(self, headers, body), fields(url = %url))]
468 pub async fn put(
469 &self,
470 url: &str,
471 headers: Option<HeaderMap>,
472 body: Option<Value>,
473 ) -> Result<Value> {
474 self.fetch(url, Method::PUT, headers, body).await
475 }
476
477 #[instrument(name = "http_delete", skip(self, headers, body), fields(url = %url))]
493 pub async fn delete(
494 &self,
495 url: &str,
496 headers: Option<HeaderMap>,
497 body: Option<Value>,
498 ) -> Result<Value> {
499 self.fetch(url, Method::DELETE, headers, body).await
500 }
501
502 pub fn config(&self) -> &HttpConfig {
504 &self.config
505 }
506
507 pub fn set_config(&mut self, config: HttpConfig) {
513 self.config = config;
514 }
515}
516
517fn headers_to_json(headers: &HeaderMap) -> Value {
519 let mut map = serde_json::Map::new();
520 for (key, value) in headers.iter() {
521 let key_str = key.as_str().to_string();
522 let value_str = value.to_str().unwrap_or("").to_string();
523 map.insert(key_str, Value::String(value_str));
524 }
525 Value::Object(map)
526}
527
528#[cfg(test)]
529mod tests {
530 use super::*;
531
532 #[tokio::test]
533 async fn test_http_client_creation() {
534 let config = HttpConfig::default();
535 let client = HttpClient::new(config);
536 assert!(client.is_ok());
537 }
538
539 #[tokio::test]
540 async fn test_http_config_default() {
541 let config = HttpConfig::default();
542 assert_eq!(config.timeout, 30);
543 assert!(config.retry_config.is_none());
544 assert!(!config.verbose);
545 assert_eq!(config.user_agent, "ccxt-rust/1.0");
546 assert!(config.enable_rate_limit);
547 }
548
549 #[tokio::test]
550 async fn test_headers_to_json() {
551 let mut headers = HeaderMap::new();
552 headers.insert("Content-Type", "application/json".parse().unwrap());
553 headers.insert("X-Custom-Header", "test-value".parse().unwrap());
554
555 let json = headers_to_json(&headers);
556 assert!(json.is_object());
557
558 let obj = json.as_object().unwrap();
559 assert_eq!(obj.get("content-type").unwrap(), "application/json");
560 assert_eq!(obj.get("x-custom-header").unwrap(), "test-value");
561 }
562
563 #[tokio::test]
564 async fn test_http_client_with_proxy() {
565 let config = HttpConfig {
566 proxy: Some("http://localhost:8080".to_string()),
567 ..Default::default()
568 };
569
570 let client = HttpClient::new(config);
572 assert!(client.is_ok());
573 }
574
575 #[tokio::test]
576 async fn test_get_request() {
577 let config = HttpConfig {
578 verbose: false,
579 ..Default::default()
580 };
581 let client = HttpClient::new(config).unwrap();
582
583 let result = client.get("https://httpbin.org/get", None).await;
585
586 match result {
589 Ok(value) => {
590 assert!(value.is_object());
591 }
592 Err(e) => {
593 warn!("Network test skipped due to: {:?}", e);
595 }
596 }
597 }
598
599 #[tokio::test]
600 async fn test_post_request() {
601 let config = HttpConfig::default();
602 let client = HttpClient::new(config).unwrap();
603
604 let body = serde_json::json!({
605 "test": "data",
606 "number": 123
607 });
608
609 let result = client
610 .post("https://httpbin.org/post", None, Some(body))
611 .await;
612
613 match result {
614 Ok(value) => {
615 assert!(value.is_object());
616 }
617 Err(e) => {
618 warn!("Network test skipped due to: {:?}", e);
619 }
620 }
621 }
622
623 #[tokio::test]
624 async fn test_http_error_handling() {
625 let config = HttpConfig::default();
626 let client = HttpClient::new(config).unwrap();
627
628 let result = client.get("https://httpbin.org/status/404", None).await;
630 assert!(result.is_err());
631
632 if let Err(e) = result {
633 match e {
634 Error::InvalidRequest(_) => {
635 }
637 Error::Exchange(_) => {
638 }
640 Error::Network(_) => {
641 }
643 _ => panic!("Unexpected error type: {:?}", e),
644 }
645 }
646 }
647
648 #[tokio::test]
649 async fn test_timeout() {
650 let config = HttpConfig {
651 timeout: 1, retry_config: Some(RetryConfig {
653 max_retries: 0,
654 ..RetryConfig::default()
655 }),
656 ..Default::default()
657 };
658 let client = HttpClient::new(config).unwrap();
659
660 let result = client.get("https://httpbin.org/delay/5", None).await;
662 assert!(result.is_err());
663
664 if let Err(e) = result {
665 match e {
666 Error::Network(_) => {
667 }
669 _ => panic!("Expected Network error, got: {:?}", e),
670 }
671 }
672 }
673
674 #[tokio::test]
675 async fn test_retry_mechanism() {
676 let config = HttpConfig {
677 retry_config: Some(RetryConfig {
678 max_retries: 2,
679 ..RetryConfig::default()
680 }),
681 verbose: true,
682 ..Default::default()
683 };
684 let client = HttpClient::new(config).unwrap();
685
686 let result = client.get("https://httpbin.org/status/503", None).await;
689
690 assert!(result.is_err());
692 }
693
694 #[tokio::test]
695 async fn test_rate_limiter_integration() {
696 use crate::rate_limiter::{RateLimiter, RateLimiterConfig};
697 use std::time::{Duration, Instant};
698
699 let limiter_config = RateLimiterConfig::new(5, Duration::from_secs(1));
701 let limiter = RateLimiter::new(limiter_config);
702
703 let config = HttpConfig {
704 enable_rate_limit: true,
705 verbose: false,
706 ..Default::default()
707 };
708
709 let client = HttpClient::new_with_rate_limiter(config, limiter).unwrap();
710
711 let start = Instant::now();
713
714 for _ in 0..10 {
716 let _ = client.get("https://httpbin.org/get", None).await;
717 }
718
719 let elapsed = start.elapsed();
720
721 assert!(
724 elapsed >= Duration::from_secs(1),
725 "Rate limiter should have delayed requests"
726 );
727 }
728
729 #[tokio::test]
730 async fn test_rate_limiter_disabled() {
731 let config = HttpConfig {
732 enable_rate_limit: false,
733 verbose: false,
734 ..Default::default()
735 };
736
737 let client = HttpClient::new(config).unwrap();
738
739 match client.get("https://httpbin.org/get", None).await {
741 Ok(_) => assert!(true),
742 Err(e) => {
743 warn!("Network test skipped due to: {:?}", e);
745 }
746 }
747 }
748}