use std::{
fmt::Debug,
path::PathBuf,
sync::{Arc, OnceLock},
time::Duration,
};
pub use bytes::Bytes;
use eyre::{Report, eyre};
use jiff::Timestamp;
use reqwest::Url;
pub use reqwest::{
Method, Request, RequestBuilder, StatusCode,
header::{self, HeaderMap},
};
use serde::Serialize;
use tracing::{Span, debug, error, field::Empty, info, instrument, warn};
pub use ustr::Ustr;
use crate::{
ConstructAuthError, RetryConfig, UrlError,
ratelimiter::{RateLimiter, clock::MonotonicClock},
retry::ExponentialBackoff,
};
pub static USER_AGENT: &str = concat!("v_exchanges_api_generics/", env!("CARGO_PKG_VERSION"));
#[derive(Clone, Debug, Default)]
pub struct Client {
client: reqwest::Client,
pub config: RequestConfig,
pub rate_limiter: Option<Arc<RateLimiter<Ustr, MonotonicClock>>>,
}
impl Client {
#[instrument(skip_all, fields(?url, ?query, request_builder = Empty))] pub async fn request<Q, B, H>(&self, method: Method, url: &str, query: Option<&Q>, body: Option<B>, handler: &H) -> Result<H::Successful, RequestError>
where
Q: Serialize + ?Sized + std::fmt::Debug,
H: RequestHandler<B>, {
let config = &self.config;
let base_url = handler.base_url(config.use_testnet)?;
let url = base_url.join(url).map_err(|_| RequestError::Other(eyre!("Failed to parse provided URL")))?;
debug!(?config);
let mock_path = config.mock_cache_dir.as_ref().map(|dir| mock_cache_path(dir, &url));
if let Some(ref path) = mock_path
&& let Ok(file) = std::fs::read_to_string(path)
&& path
.metadata()
.expect("already read the file, guaranteed to exist")
.modified()
.expect("switch OSes, you're on something stupid")
.elapsed()
.unwrap() < MOCK_CACHE_DURATION
{
debug!("Mock cache hit: {}", path.display());
let body = Bytes::from(file);
let (status, headers) = (StatusCode::OK, header::HeaderMap::new());
return handler.handle_response(status, headers, body).map_err(RequestError::HandleResponse);
}
if let Some(rl) = &self.rate_limiter {
let bucket = {
let exchange = url.host_str().and_then(|h| {
let parts: Vec<&str> = h.split('.').collect();
parts.len().checked_sub(2).map(|i| parts[i])
});
let key_name = handler.rate_limit_key_name();
let s = match (exchange, key_name.as_deref()) {
(Some(ex), Some(kn)) => format!("ip.{ex}.{kn}"),
(Some(ex), None) => format!("ip.{ex}"),
(None, _) => "ip".to_owned(),
};
Ustr::from(&s)
};
rl.until_key_ready_n(&bucket, 1).await;
}
let mut backoff = ExponentialBackoff::try_from(&config.retry).map_err(|e| RequestError::Other(eyre!("Invalid retry configuration: {e}")))?;
let mut attempt: u32 = 0;
loop {
let attempt_num = attempt + 1;
let mut request_builder = self.client.request(method.clone(), url.clone()).timeout(config.timeout);
if let Some(query) = query {
request_builder = request_builder.query(query);
}
Span::current().record("request_builder", format!("{request_builder:?}"));
if config.use_testnet
&& let Some(cache_duration) = config.cache_testnet_calls
{
let path = test_calls_path(&url, &query);
if let Ok(file) = std::fs::read_to_string(&path)
&& path
.metadata()
.expect("already read the file, guaranteed to exist")
.modified()
.expect("switch OSes, you're on something stupid")
.elapsed()
.unwrap() < cache_duration
{
let body = Bytes::from(file);
let (status, headers) = (StatusCode::OK, header::HeaderMap::new()); return handler.handle_response(status, headers, body).map_err(RequestError::HandleResponse);
}
}
let request = handler.build_request(request_builder, &body, attempt_num as u8).map_err(RequestError::BuildRequest)?;
match self.client.execute(request).await {
Ok(mut response) => {
let status = response.status();
let headers = std::mem::take(response.headers_mut());
debug!(?status, ?headers, "Received response headers");
let body: Bytes = match response.bytes().await {
Ok(b) => b,
Err(e) => {
error!(?status, ?headers, ?e, "Failed to read response body");
return Err(RequestError::ReceiveResponse(e));
}
};
{
let truncated_body = v_utils::utils::truncate_msg(std::str::from_utf8(&body)?.trim());
debug!(truncated_body);
}
if status.is_success()
&& let Some(ref path) = mock_path
{
if let Some(parent) = path.parent() {
std::fs::create_dir_all(parent).ok();
}
std::fs::write(path, &body).ok();
debug!("Mock cache write: {}", path.display());
}
match config.use_testnet {
true => {
let handled = handler.handle_response(status, headers.clone(), body.clone())?;
std::fs::write(test_calls_path(&url, &query), &body).ok();
return Ok(handled);
}
false => {
return handler.handle_response(status, headers.clone(), body.clone()).map_err(|e| {
error!(?status, ?headers, body = ?v_utils::utils::truncate_msg(std::str::from_utf8(&body).unwrap_or("<invalid utf8>")), "Failed to handle response");
RequestError::HandleResponse(e)
});
}
}
}
Err(e) =>
if attempt < config.retry.max_retries && is_retryable_request_error(&e) {
let delay = backoff.next_duration();
info!(attempt = attempt_num, delay_ms = delay.as_millis(), "Retrying after network error");
if delay.is_zero() {
tokio::task::yield_now().await;
} else {
tokio::time::sleep(delay).await;
}
attempt += 1;
} else {
warn!(?e);
return Err(RequestError::SendRequest(e));
},
}
}
}
pub async fn get<Q, H>(&self, url: &str, query: &Q, handler: &H) -> Result<H::Successful, RequestError>
where
Q: Serialize + ?Sized + Debug,
H: RequestHandler<()>, {
self.request::<Q, (), H>(Method::GET, url, Some(query), None, handler).await
}
pub async fn get_no_query<H>(&self, url: &str, handler: &H) -> Result<H::Successful, RequestError>
where
H: RequestHandler<()>, {
self.request::<&[(&str, &str)], (), H>(Method::GET, url, None, None, handler).await
}
pub async fn post<B, H>(&self, url: &str, body: B, handler: &H) -> Result<H::Successful, RequestError>
where
H: RequestHandler<B>, {
self.request::<(), B, H>(Method::POST, url, None, Some(body), handler).await
}
pub async fn post_no_body<H>(&self, url: &str, handler: &H) -> Result<H::Successful, RequestError>
where
H: RequestHandler<()>, {
self.request::<(), (), H>(Method::POST, url, None, None, handler).await
}
pub async fn put<B, H>(&self, url: &str, body: B, handler: &H) -> Result<H::Successful, RequestError>
where
H: RequestHandler<B>, {
self.request::<(), B, H>(Method::PUT, url, None, Some(body), handler).await
}
pub async fn put_no_body<H>(&self, url: &str, handler: &H) -> Result<H::Successful, RequestError>
where
H: RequestHandler<()>, {
self.request::<(), (), H>(Method::PUT, url, None, None, handler).await
}
pub async fn delete<Q, H>(&self, url: &str, query: &Q, handler: &H) -> Result<H::Successful, RequestError>
where
Q: Serialize + ?Sized + Debug,
H: RequestHandler<()>, {
self.request::<Q, (), H>(Method::DELETE, url, Some(query), None, handler).await
}
pub async fn delete_no_query<H>(&self, url: &str, handler: &H) -> Result<H::Successful, RequestError>
where
H: RequestHandler<()>, {
self.request::<&[(&str, &str)], (), H>(Method::DELETE, url, None, None, handler).await
}
}
pub trait RequestHandler<B> {
type Successful;
#[allow(unused_variables)]
fn base_url(&self, is_test: bool) -> Result<url::Url, UrlError> {
Url::parse("").map_err(UrlError::Parse)
}
fn build_request(&self, builder: RequestBuilder, request_body: &Option<B>, attempt_count: u8) -> Result<Request, BuildError>;
fn handle_response(&self, status: StatusCode, headers: HeaderMap, response_body: Bytes) -> Result<Self::Successful, HandleError>;
fn rate_limit_key_name(&self) -> Option<String> {
None
}
}
#[derive(Clone, Debug, Default)]
pub struct RequestConfig {
pub retry: RetryConfig,
pub timeout: Duration = Duration::from_secs(3),
pub use_testnet: bool,
pub cache_testnet_calls: Option<Duration> = Some(Duration::from_days(30)),
pub mock_cache_dir: Option<PathBuf>,
}
#[derive(Debug, miette::Diagnostic, derive_more::Display, thiserror::Error, derive_more::From)]
pub enum HandleError {
#[diagnostic(transparent)]
Api(ApiError),
#[diagnostic(code(v_exchanges::http::handle::parse), help("The response body could not be parsed. Check if the API response format has changed."))]
Parse(Report),
}
#[non_exhaustive]
#[derive(Debug, miette::Diagnostic, derive_more::Display, thiserror::Error, derive_more::From)]
pub enum ApiError {
#[diagnostic(transparent)]
Ip(IpError),
#[diagnostic(transparent)]
Auth(AuthError),
#[error(transparent)]
Other(Report),
}
#[non_exhaustive]
#[allow(unused_assignments)] #[derive(Debug, miette::Diagnostic, thiserror::Error)]
pub enum IpError {
#[error("IP timed out or banned until {until:?}")]
#[diagnostic(code(v_exchanges::ip::timeout), help("Your IP has been rate-limited. Wait until the specified time or reduce request frequency."))]
Timeout {
until: Option<Timestamp>,
},
#[error("blocked by WAF: {msg}")]
#[diagnostic(
code(v_exchanges::ip::waf),
help("Your request was blocked by the exchange's CDN/WAF. This could be geo-blocking, rate-limiting, or a malformed request.")
)]
Waf { msg: String },
#[error("geo-blocked: {msg}")]
#[diagnostic(
code(v_exchanges::ip::geo_blocked),
help("Your IP is in a region restricted by the exchange. Use a VPN or contact the exchange for more information.")
)]
GeoBlocked { msg: String },
}
#[non_exhaustive]
#[allow(unused_assignments)] #[derive(Debug, miette::Diagnostic, thiserror::Error)]
pub enum AuthError {
#[error("API key has expired: {msg}")]
#[diagnostic(code(v_exchanges::http::api::auth::key_expired), help("Generate a new API key from the exchange dashboard."))]
KeyExpired { msg: String },
#[error("Unauthorized: {msg}")]
#[diagnostic(
code(v_exchanges::http::api::auth::unauthorized),
help("Check that your API key and secret are correct and have the required permissions.")
)]
Unauthorized { msg: String },
}
#[derive(Debug, miette::Diagnostic, thiserror::Error)]
pub enum RequestError {
#[error("failed to send HTTP request: {0}")]
#[diagnostic(code(v_exchanges::http::request::send), help("Check your network connection and firewall settings."))]
SendRequest(#[source] reqwest::Error),
#[error("failed to parse response body as UTF-8: {0}")]
#[diagnostic(code(v_exchanges::http::request::utf8))]
Utf8Error(#[from] std::str::Utf8Error),
#[error("failed to receive HTTP response: {0}")]
#[diagnostic(code(v_exchanges::http::request::receive), help("The server may have closed the connection. Try again."))]
ReceiveResponse(#[source] reqwest::Error),
#[error("handler failed to build a request: {0}")]
#[diagnostic(transparent)]
BuildRequest(#[from] BuildError),
#[error("handler failed to process the response: {0}")]
#[diagnostic(transparent)]
HandleResponse(#[from] HandleError),
#[error("{0}")]
#[diagnostic(transparent)]
Url(#[from] UrlError),
#[allow(missing_docs)]
#[error(transparent)]
Other(#[from] Report),
}
#[derive(Debug, miette::Diagnostic, derive_more::Display, thiserror::Error, derive_more::From)]
pub enum BuildError {
#[diagnostic(transparent)]
Auth(ConstructAuthError),
#[diagnostic(code(v_exchanges::http::build::url_serialization), help("Check that all request parameters can be URL-encoded."))]
UrlSerialization(serde_urlencoded::ser::Error),
#[diagnostic(code(v_exchanges::http::build::json_serialization), help("Check that all request body fields can be serialized to JSON."))]
JsonSerialization(serde_json::Error),
#[allow(missing_docs)]
#[error(transparent)]
Other(Report),
}
fn is_retryable_request_error(e: &reqwest::Error) -> bool {
e.is_timeout() || e.is_connect() || (e.is_request() && e.status().is_none())
}
static TEST_CALLS_PATH: OnceLock<PathBuf> = OnceLock::new();
fn test_calls_path<Q: Serialize>(url: &Url, query: &Option<Q>) -> PathBuf {
let base = TEST_CALLS_PATH.get_or_init(|| v_utils::xdg_cache_dir!("test_calls"));
let mut filename = url.to_string();
if query.is_some() {
filename.push('?');
filename.push_str(&serde_urlencoded::to_string(query).unwrap_or_default());
}
base.join(filename)
}
const MOCK_CACHE_DURATION: Duration = Duration::from_days(30);
fn mock_cache_path(cache_dir: &PathBuf, url: &Url) -> PathBuf {
let host = url.host_str().unwrap_or("unknown");
let path = url.path().trim_start_matches('/');
cache_dir.join(host).join(path)
}