use std::{fmt::Debug, path::PathBuf, sync::OnceLock, time::Duration};
pub use bytes::Bytes;
use eyre::{Report, eyre};
use jiff::Timestamp;
use reqwest::Url;
pub use reqwest::{
Method, Request, RequestBuilder, StatusCode,
header::{self, HeaderMap},
};
use serde::Serialize;
use tracing::{Span, debug, error, field::Empty, info, instrument, warn};
use crate::{ConstructAuthError, UrlError};
pub static USER_AGENT: &str = concat!("v_exchanges_api_generics/", env!("CARGO_PKG_VERSION"));
#[derive(Clone, Debug, Default)]
pub struct Client {
client: reqwest::Client,
pub config: RequestConfig,
}
impl Client {
#[instrument(skip_all, fields(?url, ?query, request_builder = Empty))] pub async fn request<Q, B, H>(&self, method: Method, url: &str, query: Option<&Q>, body: Option<B>, handler: &H) -> Result<H::Successful, RequestError>
where
Q: Serialize + ?Sized + std::fmt::Debug,
H: RequestHandler<B>, {
let config = &self.config;
config.verify();
let base_url = handler.base_url(config.use_testnet)?;
let url = base_url.join(url).map_err(|_| RequestError::Other(eyre!("Failed to parse provided URL")))?;
debug!(?config);
let mock_path = config.mock_cache_dir.as_ref().map(|dir| mock_cache_path(dir, &url));
if let Some(ref path) = mock_path {
if let Ok(file) = std::fs::read_to_string(path)
&& path
.metadata()
.expect("already read the file, guaranteed to exist")
.modified()
.expect("switch OSes, you're on something stupid")
.elapsed()
.unwrap() < MOCK_CACHE_DURATION
{
debug!("Mock cache hit: {}", path.display());
let body = Bytes::from(file);
let (status, headers) = (StatusCode::OK, header::HeaderMap::new());
return handler.handle_response(status, headers, body).map_err(RequestError::HandleResponse);
}
}
for i in 1..=config.max_tries {
let mut request_builder = self.client.request(method.clone(), url.clone()).timeout(config.timeout);
if let Some(query) = query {
request_builder = request_builder.query(query);
}
Span::current().record("request_builder", format!("{request_builder:?}"));
if config.use_testnet
&& let Some(cache_duration) = config.cache_testnet_calls
{
let path = test_calls_path(&url, &query);
if let Ok(file) = std::fs::read_to_string(&path)
&& path
.metadata()
.expect("already read the file, guaranteed to exist")
.modified()
.expect("switch OSes, you're on something stupid")
.elapsed()
.unwrap() < cache_duration
{
let body = Bytes::from(file);
let (status, headers) = (StatusCode::OK, header::HeaderMap::new()); return handler.handle_response(status, headers, body).map_err(RequestError::HandleResponse);
}
}
let request = handler.build_request(request_builder, &body, i).map_err(RequestError::BuildRequest)?;
match self.client.execute(request).await {
Ok(mut response) => {
let status = response.status();
let headers = std::mem::take(response.headers_mut());
debug!(?status, ?headers, "Received response headers");
let body: Bytes = match response.bytes().await {
Ok(b) => b,
Err(e) => {
error!(?status, ?headers, ?e, "Failed to read response body");
return Err(RequestError::ReceiveResponse(e));
}
};
{
let truncated_body = v_utils::utils::truncate_msg(std::str::from_utf8(&body)?.trim());
debug!(truncated_body);
}
if status.is_success() {
if let Some(ref path) = mock_path {
if let Some(parent) = path.parent() {
std::fs::create_dir_all(parent).ok();
}
std::fs::write(path, &body).ok();
debug!("Mock cache write: {}", path.display());
}
}
match config.use_testnet {
true => {
let handled = handler.handle_response(status, headers.clone(), body.clone())?;
std::fs::write(test_calls_path(&url, &query), &body).ok();
return Ok(handled);
}
false => {
return handler.handle_response(status, headers.clone(), body.clone()).map_err(|e| {
error!(?status, ?headers, body = ?v_utils::utils::truncate_msg(std::str::from_utf8(&body).unwrap_or("<invalid utf8>")), "Failed to handle response");
RequestError::HandleResponse(e)
});
}
}
}
Err(e) =>
if i < config.max_tries && e.is_timeout() {
info!("Retrying sending request; made so far: {i}");
tokio::time::sleep(config.retry_cooldown).await;
} else {
warn!(?e);
debug!("{:?}\nAnd then trying the .is_timeout(): {}", e.status(), e.is_timeout());
return Err(RequestError::SendRequest(e));
},
}
}
unreachable!()
}
pub async fn get<Q, H>(&self, url: &str, query: &Q, handler: &H) -> Result<H::Successful, RequestError>
where
Q: Serialize + ?Sized + Debug,
H: RequestHandler<()>, {
self.request::<Q, (), H>(Method::GET, url, Some(query), None, handler).await
}
pub async fn get_no_query<H>(&self, url: &str, handler: &H) -> Result<H::Successful, RequestError>
where
H: RequestHandler<()>, {
self.request::<&[(&str, &str)], (), H>(Method::GET, url, None, None, handler).await
}
pub async fn post<B, H>(&self, url: &str, body: B, handler: &H) -> Result<H::Successful, RequestError>
where
H: RequestHandler<B>, {
self.request::<(), B, H>(Method::POST, url, None, Some(body), handler).await
}
pub async fn post_no_body<H>(&self, url: &str, handler: &H) -> Result<H::Successful, RequestError>
where
H: RequestHandler<()>, {
self.request::<(), (), H>(Method::POST, url, None, None, handler).await
}
pub async fn put<B, H>(&self, url: &str, body: B, handler: &H) -> Result<H::Successful, RequestError>
where
H: RequestHandler<B>, {
self.request::<(), B, H>(Method::PUT, url, None, Some(body), handler).await
}
pub async fn put_no_body<H>(&self, url: &str, handler: &H) -> Result<H::Successful, RequestError>
where
H: RequestHandler<()>, {
self.request::<(), (), H>(Method::PUT, url, None, None, handler).await
}
pub async fn delete<Q, H>(&self, url: &str, query: &Q, handler: &H) -> Result<H::Successful, RequestError>
where
Q: Serialize + ?Sized + Debug,
H: RequestHandler<()>, {
self.request::<Q, (), H>(Method::DELETE, url, Some(query), None, handler).await
}
pub async fn delete_no_query<H>(&self, url: &str, handler: &H) -> Result<H::Successful, RequestError>
where
H: RequestHandler<()>, {
self.request::<&[(&str, &str)], (), H>(Method::DELETE, url, None, None, handler).await
}
}
pub trait RequestHandler<B> {
type Successful;
#[allow(unused_variables)]
fn base_url(&self, is_test: bool) -> Result<url::Url, UrlError> {
Url::parse("").map_err(UrlError::Parse)
}
fn build_request(&self, builder: RequestBuilder, request_body: &Option<B>, attempt_count: u8) -> Result<Request, BuildError>;
fn handle_response(&self, status: StatusCode, headers: HeaderMap, response_body: Bytes) -> Result<Self::Successful, HandleError>;
}
#[derive(Clone, Debug, Default)]
pub struct RequestConfig {
pub max_tries: u8 = 1,
pub retry_cooldown: Duration = Duration::from_millis(500),
pub timeout: Duration = Duration::from_secs(3),
pub use_testnet: bool,
pub cache_testnet_calls: Option<Duration> = Some(Duration::from_days(30)),
pub mock_cache_dir: Option<PathBuf>,
}
impl RequestConfig {
fn verify(&self) {
assert_ne!(self.max_tries, 0, "RequestConfig.max_tries must not be equal to 0");
}
}
#[derive(Debug, miette::Diagnostic, derive_more::Display, thiserror::Error, derive_more::From)]
pub enum HandleError {
#[diagnostic(transparent)]
Api(ApiError),
#[diagnostic(code(v_exchanges::http::handle::parse), help("The response body could not be parsed. Check if the API response format has changed."))]
Parse(Report),
}
#[non_exhaustive]
#[derive(Debug, miette::Diagnostic, thiserror::Error, derive_more::From)]
pub enum ApiError {
#[error("{0}")]
#[diagnostic(transparent)]
Ip(IpError),
#[error("{0}")]
#[diagnostic(transparent)]
Auth(AuthError),
#[error("{0}")]
#[diagnostic(code(v_exchanges::http::api::other))]
Other(Report),
}
#[non_exhaustive]
#[derive(Debug, miette::Diagnostic, thiserror::Error)]
pub enum IpError {
#[error("IP timed out or banned until {until:?}")]
#[diagnostic(code(v_exchanges::ip::timeout), help("Your IP has been rate-limited. Wait until the specified time or reduce request frequency."))]
#[allow(unused_assignments)] Timeout {
until: Option<Timestamp>,
},
#[error("blocked by WAF: {msg}")]
#[diagnostic(
code(v_exchanges::ip::waf),
help("Your request was blocked by the exchange's CDN/WAF. This could be geo-blocking, rate-limiting, or a malformed request.")
)]
Waf { msg: String },
#[error("geo-blocked: {msg}")]
#[diagnostic(
code(v_exchanges::ip::geo_blocked),
help("Your IP is in a region restricted by the exchange. Use a VPN or contact the exchange for more information.")
)]
GeoBlocked { msg: String },
}
#[non_exhaustive]
#[allow(unused_assignments)] #[derive(Debug, miette::Diagnostic, thiserror::Error)]
pub enum AuthError {
#[error("API key has expired: {msg}")]
#[diagnostic(code(v_exchanges::http::api::auth::key_expired), help("Generate a new API key from the exchange dashboard."))]
KeyExpired { msg: String },
#[error("Unauthorized: {msg}")]
#[diagnostic(
code(v_exchanges::http::api::auth::unauthorized),
help("Check that your API key and secret are correct and have the required permissions.")
)]
Unauthorized { msg: String },
}
#[derive(Debug, miette::Diagnostic, thiserror::Error)]
pub enum RequestError {
#[error("failed to send HTTP request: {0}")]
#[diagnostic(code(v_exchanges::http::request::send), help("Check your network connection and firewall settings."))]
SendRequest(#[source] reqwest::Error),
#[error("failed to parse response body as UTF-8: {0}")]
#[diagnostic(code(v_exchanges::http::request::utf8))]
Utf8Error(#[from] std::str::Utf8Error),
#[error("failed to receive HTTP response: {0}")]
#[diagnostic(code(v_exchanges::http::request::receive), help("The server may have closed the connection. Try again."))]
ReceiveResponse(#[source] reqwest::Error),
#[error("handler failed to build a request: {0}")]
#[diagnostic(transparent)]
BuildRequest(#[from] BuildError),
#[error("handler failed to process the response: {0}")]
#[diagnostic(transparent)]
HandleResponse(#[from] HandleError),
#[error("{0}")]
#[diagnostic(transparent)]
Url(#[from] UrlError),
#[allow(missing_docs)]
#[error("{0}")]
#[diagnostic(code(v_exchanges::http::request::other))]
Other(#[from] Report),
}
#[derive(Debug, miette::Diagnostic, derive_more::Display, thiserror::Error, derive_more::From)]
pub enum BuildError {
#[diagnostic(transparent)]
Auth(ConstructAuthError),
#[diagnostic(code(v_exchanges::http::build::url_serialization), help("Check that all request parameters can be URL-encoded."))]
UrlSerialization(serde_urlencoded::ser::Error),
#[diagnostic(code(v_exchanges::http::build::json_serialization), help("Check that all request body fields can be serialized to JSON."))]
JsonSerialization(serde_json::Error),
#[allow(missing_docs)]
#[diagnostic(code(v_exchanges::http::build::other))]
Other(Report),
}
static TEST_CALLS_PATH: OnceLock<PathBuf> = OnceLock::new();
fn test_calls_path<Q: Serialize>(url: &Url, query: &Option<Q>) -> PathBuf {
let base = TEST_CALLS_PATH.get_or_init(|| v_utils::xdg_cache_dir!("test_calls"));
let mut filename = url.to_string();
if query.is_some() {
filename.push('?');
filename.push_str(&serde_urlencoded::to_string(query).unwrap_or_default());
}
base.join(filename)
}
const MOCK_CACHE_DURATION: Duration = Duration::from_days(30);
fn mock_cache_path(cache_dir: &PathBuf, url: &Url) -> PathBuf {
let host = url.host_str().unwrap_or("unknown");
let path = url.path().trim_start_matches('/');
cache_dir.join(host).join(path)
}