use std::collections::HashMap;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::time::Duration;
use reqwest::{Client, Method, Response, header::HeaderMap};
use serde_json::Value;
use crate::core::types::{ExchangeError, ExchangeResult};
#[derive(Debug, Clone)]
pub struct RetryConfig {
pub max_attempts: u32,
pub initial_backoff_ms: u64,
pub max_backoff_ms: u64,
pub backoff_multiplier: f64,
pub jitter_factor: f64,
}
impl Default for RetryConfig {
fn default() -> Self {
Self {
max_attempts: 3,
initial_backoff_ms: 100,
max_backoff_ms: 5000,
backoff_multiplier: 2.0,
jitter_factor: 0.0,
}
}
}
impl RetryConfig {
pub fn unreliable_api() -> Self {
Self {
max_attempts: 7,
initial_backoff_ms: 500,
max_backoff_ms: 8000,
backoff_multiplier: 1.5,
jitter_factor: 0.3, }
}
}
pub struct HttpClient {
client: Client,
timeout: Duration,
retry_config: RetryConfig,
debug: bool,
pub requests_total: Arc<AtomicU64>,
pub errors_total: Arc<AtomicU64>,
pub last_latency_ms: Arc<AtomicU64>,
}
impl HttpClient {
pub fn new(timeout_ms: u64) -> ExchangeResult<Self> {
Self::with_config(timeout_ms, RetryConfig::default())
}
pub fn with_config(timeout_ms: u64, retry_config: RetryConfig) -> ExchangeResult<Self> {
let _ = rustls::crypto::ring::default_provider().install_default();
let client = Client::builder()
.timeout(Duration::from_millis(timeout_ms))
.build()
.map_err(|e| ExchangeError::Network(format!("Failed to create HTTP client: {}", e)))?;
let debug = std::env::var("DEBUG_API").is_ok();
Ok(Self {
client,
timeout: Duration::from_millis(timeout_ms),
retry_config,
debug,
requests_total: Arc::new(AtomicU64::new(0)),
errors_total: Arc::new(AtomicU64::new(0)),
last_latency_ms: Arc::new(AtomicU64::new(0)),
})
}
pub async fn get(
&self,
url: &str,
params: &HashMap<String, String>,
) -> ExchangeResult<Value> {
self.request_with_retry(Method::GET, url, params, &HashMap::new(), None).await
}
pub async fn get_with_headers(
&self,
url: &str,
params: &HashMap<String, String>,
headers: &HashMap<String, String>,
) -> ExchangeResult<Value> {
self.request_with_retry(Method::GET, url, params, headers, None).await
}
pub async fn post(
&self,
url: &str,
body: &Value,
headers: &HashMap<String, String>,
) -> ExchangeResult<Value> {
self.request_with_retry(Method::POST, url, &HashMap::new(), headers, Some(body)).await
}
pub async fn post_with_params(
&self,
url: &str,
params: &HashMap<String, String>,
body: &Value,
headers: &HashMap<String, String>,
) -> ExchangeResult<Value> {
self.request_with_retry(Method::POST, url, params, headers, Some(body)).await
}
pub async fn delete(
&self,
url: &str,
params: &HashMap<String, String>,
headers: &HashMap<String, String>,
) -> ExchangeResult<Value> {
self.request_with_retry(Method::DELETE, url, params, headers, None).await
}
pub async fn delete_with_body(
&self,
url: &str,
body: &Value,
headers: &HashMap<String, String>,
) -> ExchangeResult<Value> {
self.request_with_retry(Method::DELETE, url, &HashMap::new(), headers, Some(body)).await
}
pub async fn put(
&self,
url: &str,
body: &Value,
headers: &HashMap<String, String>,
) -> ExchangeResult<Value> {
self.request_with_retry(Method::PUT, url, &HashMap::new(), headers, Some(body)).await
}
pub async fn patch(
&self,
url: &str,
body: &Value,
headers: &HashMap<String, String>,
) -> ExchangeResult<Value> {
self.request_with_retry(Method::PATCH, url, &HashMap::new(), headers, Some(body)).await
}
pub async fn get_with_response_headers(
&self,
url: &str,
params: &HashMap<String, String>,
headers: &HashMap<String, String>,
) -> ExchangeResult<(Value, HeaderMap)> {
self.request_returning_headers(Method::GET, url, params, headers, None).await
}
pub async fn post_with_response_headers(
&self,
url: &str,
body: &Value,
headers: &HashMap<String, String>,
) -> ExchangeResult<(Value, HeaderMap)> {
self.request_returning_headers(Method::POST, url, &HashMap::new(), headers, Some(body)).await
}
pub async fn delete_with_response_headers(
&self,
url: &str,
params: &HashMap<String, String>,
headers: &HashMap<String, String>,
) -> ExchangeResult<(Value, HeaderMap)> {
self.request_returning_headers(Method::DELETE, url, params, headers, None).await
}
pub async fn get_bytes(&self, url: &str) -> ExchangeResult<Vec<u8>> {
let response = self.client
.get(url)
.send()
.await
.map_err(|e| ExchangeError::Network(format!("Failed to download {}: {}", url, e)))?;
if !response.status().is_success() {
return Err(ExchangeError::Api {
code: response.status().as_u16() as i32,
message: format!("HTTP {} for {}", response.status(), url),
});
}
response
.bytes()
.await
.map(|b| b.to_vec())
.map_err(|e| ExchangeError::Network(format!("Failed to read bytes: {}", e)))
}
pub fn stats(&self) -> (u64, u64, u64) {
(
self.requests_total.load(Ordering::Relaxed),
self.errors_total.load(Ordering::Relaxed),
self.last_latency_ms.load(Ordering::Relaxed),
)
}
async fn request_with_retry(
&self,
method: Method,
url: &str,
params: &HashMap<String, String>,
headers: &HashMap<String, String>,
body: Option<&Value>,
) -> ExchangeResult<Value> {
let mut last_error = ExchangeError::Network("No attempts made".to_string());
let mut current_backoff = self.retry_config.initial_backoff_ms;
for attempt in 0..self.retry_config.max_attempts {
if attempt > 0 {
self.log_retry(attempt, current_backoff, &last_error);
tokio::time::sleep(Duration::from_millis(current_backoff)).await;
}
self.requests_total.fetch_add(1, Ordering::Relaxed);
let start = std::time::Instant::now();
match self.execute_request(&method, url, params, headers, body).await {
Ok(response) => {
let elapsed = start.elapsed().as_millis() as u64;
self.last_latency_ms.store(elapsed, Ordering::Relaxed);
return self.handle_response(response, attempt).await;
}
Err(e) => {
let elapsed = start.elapsed().as_millis() as u64;
self.last_latency_ms.store(elapsed, Ordering::Relaxed);
self.errors_total.fetch_add(1, Ordering::Relaxed);
last_error = e;
if !self.should_retry(&last_error) {
return Err(last_error);
}
current_backoff = self.calculate_next_backoff(current_backoff);
}
}
}
Err(ExchangeError::Network(format!(
"Max retries ({}) exceeded. Last error: {}",
self.retry_config.max_attempts, last_error
)))
}
async fn request_returning_headers(
&self,
method: Method,
url: &str,
params: &HashMap<String, String>,
headers: &HashMap<String, String>,
body: Option<&Value>,
) -> ExchangeResult<(Value, HeaderMap)> {
let mut last_error = ExchangeError::Network("No attempts made".to_string());
let mut current_backoff = self.retry_config.initial_backoff_ms;
for attempt in 0..self.retry_config.max_attempts {
if attempt > 0 {
self.log_retry(attempt, current_backoff, &last_error);
tokio::time::sleep(Duration::from_millis(current_backoff)).await;
}
self.requests_total.fetch_add(1, Ordering::Relaxed);
let start = std::time::Instant::now();
match self.execute_request(&method, url, params, headers, body).await {
Ok(response) => {
let elapsed = start.elapsed().as_millis() as u64;
self.last_latency_ms.store(elapsed, Ordering::Relaxed);
return self.handle_response_with_headers(response, attempt).await;
}
Err(e) => {
let elapsed = start.elapsed().as_millis() as u64;
self.last_latency_ms.store(elapsed, Ordering::Relaxed);
self.errors_total.fetch_add(1, Ordering::Relaxed);
last_error = e;
if !self.should_retry(&last_error) {
return Err(last_error);
}
current_backoff = self.calculate_next_backoff(current_backoff);
}
}
}
Err(ExchangeError::Network(format!(
"Max retries ({}) exceeded. Last error: {}",
self.retry_config.max_attempts, last_error
)))
}
async fn execute_request(
&self,
method: &Method,
url: &str,
params: &HashMap<String, String>,
headers: &HashMap<String, String>,
body: Option<&Value>,
) -> Result<Response, ExchangeError> {
let mut request = self.client.request(method.clone(), url);
if !params.is_empty() {
request = request.query(params);
}
for (key, value) in headers {
request = request.header(key.as_str(), value.as_str());
}
if let Some(body) = body {
request = request.json(body);
}
self.log_request(method, url, params);
request
.send()
.await
.map_err(|e| self.map_reqwest_error(e))
}
async fn handle_response(&self, response: Response, _attempt: u32) -> ExchangeResult<Value> {
let status = response.status();
let status_code = status.as_u16();
let retry_after = self.extract_retry_after(&response);
let body = response
.text()
.await
.map_err(|e| ExchangeError::Network(format!("Failed to read response: {}", e)))?;
self.log_response(status_code, &body);
if status_code == 429 {
return Err(ExchangeError::RateLimitExceeded {
retry_after,
message: self.extract_error_message(&body),
});
}
if status.is_success() {
return serde_json::from_str(&body)
.map_err(|e| ExchangeError::ParseError(format!("Invalid JSON: {} - Body: {}", e, body)));
}
let json: Option<Value> = serde_json::from_str(&body).ok();
Err(self.map_http_error(status_code, json.as_ref(), &body))
}
async fn handle_response_with_headers(
&self,
response: Response,
_attempt: u32,
) -> ExchangeResult<(Value, HeaderMap)> {
let status = response.status();
let status_code = status.as_u16();
let resp_headers = response.headers().clone();
let retry_after = self.extract_retry_after(&response);
let body = response
.text()
.await
.map_err(|e| ExchangeError::Network(format!("Failed to read response: {}", e)))?;
self.log_response(status_code, &body);
if status_code == 429 {
return Err(ExchangeError::RateLimitExceeded {
retry_after,
message: self.extract_error_message(&body),
});
}
if status.is_success() {
let value = serde_json::from_str(&body)
.map_err(|e| ExchangeError::ParseError(format!("Invalid JSON: {} - Body: {}", e, body)))?;
return Ok((value, resp_headers));
}
let json: Option<Value> = serde_json::from_str(&body).ok();
Err(self.map_http_error(status_code, json.as_ref(), &body))
}
fn should_retry(&self, error: &ExchangeError) -> bool {
match error {
ExchangeError::Network(_) => true,
ExchangeError::Timeout(_) => true,
ExchangeError::RateLimitExceeded { .. } => true,
ExchangeError::Api { code, .. } if *code >= 500 => true,
_ => false,
}
}
fn calculate_next_backoff(&self, current: u64) -> u64 {
let next = (current as f64 * self.retry_config.backoff_multiplier) as u64;
let next = next.min(self.retry_config.max_backoff_ms);
if self.retry_config.jitter_factor > 0.0 {
self.apply_jitter(next)
} else {
next
}
}
fn apply_jitter(&self, backoff_ms: u64) -> u64 {
use std::collections::hash_map::RandomState;
use std::hash::{BuildHasher, Hasher};
let random_state = RandomState::new();
let mut hasher = random_state.build_hasher();
hasher.write_u64(crate::core::timestamp_millis());
let random_value = hasher.finish();
let random_factor = (random_value % 10000) as f64 / 10000.0;
let jitter = self.retry_config.jitter_factor * random_factor;
let multiplier = 1.0 - jitter;
(backoff_ms as f64 * multiplier).max(self.retry_config.initial_backoff_ms as f64) as u64
}
fn extract_retry_after(&self, response: &Response) -> Option<u64> {
response
.headers()
.get("Retry-After")
.and_then(|v| v.to_str().ok())
.and_then(|s| s.parse::<u64>().ok())
}
fn map_reqwest_error(&self, error: reqwest::Error) -> ExchangeError {
if error.is_timeout() {
ExchangeError::Timeout(format!("Request timed out after {:?}", self.timeout))
} else if error.is_connect() {
ExchangeError::Network(format!("Connection failed: {}", error))
} else if error.is_request() {
ExchangeError::InvalidRequest(format!("Invalid request: {}", error))
} else {
ExchangeError::Network(format!("HTTP error: {}", error))
}
}
fn map_http_error(&self, status: u16, json: Option<&Value>, raw_body: &str) -> ExchangeError {
let message = json
.and_then(|j| self.extract_error_from_json(j))
.unwrap_or_else(|| raw_body.to_string());
let code = json
.and_then(|j| j.get("code").and_then(|v| v.as_i64()))
.map(|c| c as i32)
.unwrap_or(status as i32);
match status {
401 => ExchangeError::InvalidCredentials(message),
403 => ExchangeError::PermissionDenied(message),
429 => ExchangeError::RateLimitExceeded {
retry_after: None,
message,
},
400 | 422 => ExchangeError::InvalidRequest(message),
404 => ExchangeError::Api { code, message: format!("Not found: {}", message) },
500..=599 => ExchangeError::Api { code, message: format!("Server error: {}", message) },
_ => ExchangeError::Api { code, message },
}
}
fn extract_error_from_json(&self, json: &Value) -> Option<String> {
json.get("msg")
.or_else(|| json.get("message"))
.or_else(|| json.get("error"))
.or_else(|| json.get("err_msg"))
.or_else(|| json.get("retMsg"))
.and_then(|v| v.as_str())
.map(|s| s.to_string())
}
fn extract_error_message(&self, body: &str) -> String {
serde_json::from_str::<Value>(body)
.ok()
.and_then(|j| self.extract_error_from_json(&j))
.unwrap_or_else(|| body.to_string())
}
fn log_request(&self, method: &Method, url: &str, params: &HashMap<String, String>) {
if self.debug {
eprintln!("[HTTP] {} {} params={:?}", method, url, params);
}
}
fn log_response(&self, status: u16, body: &str) {
if self.debug {
eprintln!("[HTTP] Status: {}", status);
if body.len() > 500 {
eprintln!("[HTTP] Response: {}...", &body[..500]);
} else {
eprintln!("[HTTP] Response: {}", body);
}
}
}
fn log_retry(&self, attempt: u32, backoff: u64, error: &ExchangeError) {
if self.debug {
eprintln!(
"[HTTP] Retry attempt {} after {}ms. Error: {}",
attempt, backoff, error
);
}
}
}
impl Default for HttpClient {
fn default() -> Self {
Self::new(10_000).expect("Failed to create default HTTP client")
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_retry_config_default() {
let config = RetryConfig::default();
assert_eq!(config.max_attempts, 3);
assert_eq!(config.initial_backoff_ms, 100);
assert_eq!(config.max_backoff_ms, 5000);
}
#[test]
fn test_calculate_backoff() {
let client = HttpClient::new(1000).unwrap();
assert_eq!(client.calculate_next_backoff(100), 200);
assert_eq!(client.calculate_next_backoff(200), 400);
assert_eq!(client.calculate_next_backoff(3000), 5000); assert_eq!(client.calculate_next_backoff(5000), 5000); }
#[test]
fn test_should_retry() {
let client = HttpClient::new(1000).unwrap();
assert!(client.should_retry(&ExchangeError::Network("test".into())));
assert!(client.should_retry(&ExchangeError::Timeout("test".into())));
assert!(client.should_retry(&ExchangeError::RateLimitExceeded { retry_after: None, message: "test".into() }));
assert!(client.should_retry(&ExchangeError::Api { code: 500, message: "test".into() }));
assert!(client.should_retry(&ExchangeError::Api { code: 503, message: "test".into() }));
assert!(!client.should_retry(&ExchangeError::InvalidCredentials("test".into())));
assert!(!client.should_retry(&ExchangeError::PermissionDenied("test".into())));
assert!(!client.should_retry(&ExchangeError::InvalidRequest("test".into())));
assert!(!client.should_retry(&ExchangeError::Api { code: 400, message: "test".into() }));
}
}