use crate::application::auth::{Auth, Session, WebsocketInfo};
use crate::application::config::Config;
use crate::application::rate_limiter::RateLimiter;
use crate::error::AppError;
use crate::model::retry::RetryConfig;
use reqwest::Client as HttpInternalClient;
use reqwest::{Client, Method, Response, StatusCode};
use serde::Serialize;
use serde::de::DeserializeOwned;
use std::sync::Arc;
use tokio::sync::RwLock;
use tracing::{debug, error, warn};
const USER_AGENT: &str = "ig-client/0.6.0";
pub struct HttpClient {
auth: Arc<Auth>,
http_client: HttpInternalClient,
config: Arc<Config>,
rate_limiter: Arc<RwLock<RateLimiter>>,
}
impl HttpClient {
pub async fn new(config: Config) -> Result<Self, AppError> {
let config = Arc::new(config);
let http_client = HttpInternalClient::builder()
.user_agent(USER_AGENT)
.build()?;
let rate_limiter = Arc::new(RwLock::new(RateLimiter::new(&config.rate_limiter)));
let auth = Arc::new(Auth::new(config.clone()));
auth.login().await?;
Ok(Self {
auth,
http_client,
config,
rate_limiter,
})
}
pub fn new_lazy(config: Config) -> Result<Self, AppError> {
let config = Arc::new(config);
let http_client = HttpInternalClient::builder()
.user_agent(USER_AGENT)
.build()?;
let rate_limiter = Arc::new(RwLock::new(RateLimiter::new(&config.rate_limiter)));
let auth = Arc::new(Auth::new(config.clone()));
Ok(Self {
auth,
http_client,
config,
rate_limiter,
})
}
pub async fn get_ws_info(&self) -> WebsocketInfo {
self.auth.get_ws_info().await
}
pub async fn get<T: DeserializeOwned>(
&self,
path: &str,
version: Option<u8>,
) -> Result<T, AppError> {
self.request(Method::GET, path, None::<()>, version).await
}
pub async fn post<B: Serialize, T: DeserializeOwned>(
&self,
path: &str,
body: B,
version: Option<u8>,
) -> Result<T, AppError> {
self.request(Method::POST, path, Some(body), version).await
}
pub async fn put<B: Serialize, T: DeserializeOwned>(
&self,
path: &str,
body: B,
version: Option<u8>,
) -> Result<T, AppError> {
self.request(Method::PUT, path, Some(body), version).await
}
pub async fn delete<T: DeserializeOwned>(
&self,
path: &str,
version: Option<u8>,
) -> Result<T, AppError> {
self.request(Method::DELETE, path, None::<()>, version)
.await
}
pub async fn post_with_delete_method<B: Serialize, T: DeserializeOwned>(
&self,
path: &str,
body: B,
version: Option<u8>,
) -> Result<T, AppError> {
match self
.request_internal_with_delete_method(path, &body, version)
.await
{
Ok(response) => self.parse_response(response).await,
Err(AppError::OAuthTokenExpired) => {
warn!("OAuth token expired, refreshing and retrying");
self.auth.refresh_token().await?;
let response = self
.request_internal_with_delete_method(path, &body, version)
.await?;
self.parse_response(response).await
}
Err(e) => Err(e),
}
}
pub async fn request<B: Serialize, T: DeserializeOwned>(
&self,
method: Method,
path: &str,
body: Option<B>,
version: Option<u8>,
) -> Result<T, AppError> {
match self
.request_internal(method.clone(), path, &body, version)
.await
{
Ok(response) => self.parse_response(response).await,
Err(AppError::OAuthTokenExpired) => {
warn!("OAuth token expired, refreshing and retrying");
self.auth.refresh_token().await?;
let response = self.request_internal(method, path, &body, version).await?;
self.parse_response(response).await
}
Err(e) => Err(e),
}
}
async fn request_internal<B: Serialize>(
&self,
method: Method,
path: &str,
body: &Option<B>,
version: Option<u8>,
) -> Result<Response, AppError> {
let session = self.auth.get_session().await?;
let url = if path.starts_with("http") {
path.to_string()
} else {
let path = path.trim_start_matches('/');
format!("{}/{}", self.config.rest_api.base_url, path)
};
let api_key = self.config.credentials.api_key.clone();
let version_owned = version.unwrap_or(1).to_string();
let auth_header_value;
let account_id;
let cst;
let x_security_token;
let mut headers = vec![
("X-IG-API-KEY", api_key.as_str()),
("Content-Type", "application/json; charset=UTF-8"),
("Accept", "application/json; charset=UTF-8"),
("Version", version_owned.as_str()),
];
if let Some(oauth) = &session.oauth_token {
auth_header_value = format!("Bearer {}", oauth.access_token);
account_id = session.account_id.clone();
headers.push(("Authorization", auth_header_value.as_str()));
headers.push(("IG-ACCOUNT-ID", account_id.as_str()));
} else if let (Some(cst_val), Some(token_val)) = (&session.cst, &session.x_security_token) {
cst = cst_val.clone();
x_security_token = token_val.clone();
headers.push(("CST", cst.as_str()));
headers.push(("X-SECURITY-TOKEN", x_security_token.as_str()));
}
make_http_request(
&self.http_client,
self.rate_limiter.clone(),
method,
&url,
headers,
body,
RetryConfig::default(),
)
.await
}
async fn request_internal_with_delete_method<B: Serialize>(
&self,
path: &str,
body: &B,
version: Option<u8>,
) -> Result<Response, AppError> {
let session = self.auth.get_session().await?;
let url = if path.starts_with("http") {
path.to_string()
} else {
let path = path.trim_start_matches('/');
format!("{}/{}", self.config.rest_api.base_url, path)
};
let api_key = self.config.credentials.api_key.clone();
let version_owned = version.unwrap_or(1).to_string();
let auth_header_value;
let account_id;
let cst;
let x_security_token;
let mut headers = vec![
("X-IG-API-KEY", api_key.as_str()),
("Content-Type", "application/json; charset=UTF-8"),
("Accept", "application/json; charset=UTF-8"),
("Version", version_owned.as_str()),
("_method", "DELETE"), ];
if let Some(oauth) = &session.oauth_token {
auth_header_value = format!("Bearer {}", oauth.access_token);
account_id = session.account_id.clone();
headers.push(("Authorization", auth_header_value.as_str()));
headers.push(("IG-ACCOUNT-ID", account_id.as_str()));
} else if let (Some(cst_val), Some(token_val)) = (&session.cst, &session.x_security_token) {
cst = cst_val.clone();
x_security_token = token_val.clone();
headers.push(("CST", cst.as_str()));
headers.push(("X-SECURITY-TOKEN", x_security_token.as_str()));
}
make_http_request(
&self.http_client,
self.rate_limiter.clone(),
Method::POST, &url,
headers,
&Some(body),
RetryConfig::default(),
)
.await
}
async fn parse_response<T: DeserializeOwned>(&self, response: Response) -> Result<T, AppError> {
Ok(response.json().await?)
}
pub async fn switch_account(
&self,
account_id: &str,
default_account: Option<bool>,
) -> Result<(), AppError> {
self.auth
.switch_account(account_id, default_account)
.await?;
Ok(())
}
pub async fn get_session(&self) -> Result<Session, AppError> {
self.auth.get_session().await
}
pub async fn logout(&self) -> Result<(), AppError> {
self.auth.logout().await
}
pub fn auth(&self) -> &Auth {
&self.auth
}
}
impl Default for HttpClient {
fn default() -> Self {
let config = Config::default();
Self::new_lazy(config).expect("failed to create default HTTP client")
}
}
pub async fn make_http_request<B: Serialize>(
client: &Client,
rate_limiter: Arc<RwLock<RateLimiter>>,
method: Method,
url: &str,
headers: Vec<(&str, &str)>,
body: &Option<B>,
retry_config: RetryConfig,
) -> Result<Response, AppError> {
let mut retry_count = 0;
let max_retries = retry_config.max_retries();
let delay_secs = retry_config.delay_secs();
loop {
{
let limiter = rate_limiter.read().await;
limiter.wait().await;
}
debug!("{} {}", method, url);
let mut request = client.request(method.clone(), url);
for (name, value) in &headers {
request = request.header(*name, *value);
}
if let Some(b) = body {
request = request.json(b);
}
let response = request.send().await?;
let status = response.status();
debug!("Response status: {}", status);
if status.is_success() {
return Ok(response);
}
match status {
StatusCode::FORBIDDEN => {
let body_text = response.text().await.unwrap_or_default();
if body_text.contains("exceeded-account-historical-data-allowance") {
error!(
"Historical data allowance exceeded (weekly quota exhausted): {}",
body_text
);
return Err(AppError::HistoricalDataAllowanceExceeded {
allowance_expiry: 0,
});
}
if body_text.contains("exceeded-api-key-allowance")
|| body_text.contains("exceeded-account-allowance")
|| body_text.contains("exceeded-account-trading-allowance")
{
retry_count += 1;
if max_retries > 0 && retry_count > max_retries {
error!(
"Rate limit exceeded after {} attempts. Max retries ({}) reached.",
retry_count - 1,
max_retries
);
return Err(AppError::RateLimitExceeded);
}
warn!(
"Rate limit exceeded (attempt {}): {}. Waiting {} seconds before retry...",
retry_count, body_text, delay_secs
);
tokio::time::sleep(tokio::time::Duration::from_secs(delay_secs)).await;
continue; }
error!("Forbidden: {}", body_text);
return Err(AppError::Unexpected(status));
}
StatusCode::UNAUTHORIZED => {
let body_text = response.text().await.unwrap_or_default();
if body_text.contains("oauth-token-invalid") {
return Err(AppError::OAuthTokenExpired);
}
error!("Unauthorized: {}", body_text);
return Err(AppError::Unauthorized);
}
_ => {
let body = response.text().await.unwrap_or_default();
error!("Request failed with status {}: {}", status, body);
return Err(AppError::Unexpected(status));
}
}
}
}