1use crate::application::auth::{Auth, Session, WebsocketInfo};
8use crate::application::config::Config;
9use crate::application::rate_limiter::RateLimiter;
10use crate::error::AppError;
11use crate::model::retry::RetryConfig;
12use reqwest::Client as HttpInternalClient;
13use reqwest::{Client, Method, Response, StatusCode};
14use serde::Serialize;
15use serde::de::DeserializeOwned;
16use std::sync::Arc;
17use tokio::sync::RwLock;
18use tracing::{debug, error, warn};
19
20const USER_AGENT: &str = "ig-client/0.6.0";
21
22pub struct HttpClient {
31 auth: Arc<Auth>,
32 http_client: HttpInternalClient,
33 config: Arc<Config>,
34 rate_limiter: Arc<RwLock<RateLimiter>>,
35}
36
37impl HttpClient {
38 pub async fn new(config: Config) -> Result<Self, AppError> {
47 let config = Arc::new(config);
48
49 let http_client = HttpInternalClient::builder()
51 .user_agent(USER_AGENT)
52 .build()?;
53 let rate_limiter = Arc::new(RwLock::new(RateLimiter::new(&config.rate_limiter)));
54
55 let auth = Arc::new(Auth::new(config.clone()));
57
58 auth.login().await?;
60
61 Ok(Self {
62 auth,
63 http_client,
64 config,
65 rate_limiter,
66 })
67 }
68
69 pub fn new_lazy(config: Config) -> Self {
71 let config = Arc::new(config);
72
73 let http_client = HttpInternalClient::builder()
75 .user_agent(USER_AGENT)
76 .build()
77 .expect("Failed to create HTTP client");
78 let rate_limiter = Arc::new(RwLock::new(RateLimiter::new(&config.rate_limiter)));
79
80 let auth = Arc::new(Auth::new(config.clone()));
82
83 Self {
84 auth,
85 http_client,
86 config,
87 rate_limiter,
88 }
89 }
90
91 pub async fn get_ws_info(&self) -> WebsocketInfo {
96 self.auth.get_ws_info().await
97 }
98
99 pub async fn get<T: DeserializeOwned>(
101 &self,
102 path: &str,
103 version: Option<u8>,
104 ) -> Result<T, AppError> {
105 self.request(Method::GET, path, None::<()>, version).await
106 }
107
108 pub async fn post<B: Serialize, T: DeserializeOwned>(
110 &self,
111 path: &str,
112 body: B,
113 version: Option<u8>,
114 ) -> Result<T, AppError> {
115 self.request(Method::POST, path, Some(body), version).await
116 }
117
118 pub async fn put<B: Serialize, T: DeserializeOwned>(
120 &self,
121 path: &str,
122 body: B,
123 version: Option<u8>,
124 ) -> Result<T, AppError> {
125 self.request(Method::PUT, path, Some(body), version).await
126 }
127
128 pub async fn delete<T: DeserializeOwned>(&self, path: &str) -> Result<T, AppError> {
130 self.request(Method::DELETE, path, None::<()>, None).await
131 }
132
133 pub async fn post_with_delete_method<B: Serialize, T: DeserializeOwned>(
146 &self,
147 path: &str,
148 body: B,
149 version: Option<u8>,
150 ) -> Result<T, AppError> {
151 match self
152 .request_internal_with_delete_method(path, &body, version)
153 .await
154 {
155 Ok(response) => self.parse_response(response).await,
156 Err(AppError::OAuthTokenExpired) => {
157 warn!("OAuth token expired, refreshing and retrying");
158 self.auth.refresh_token().await?;
159 let response = self
160 .request_internal_with_delete_method(path, &body, version)
161 .await?;
162 self.parse_response(response).await
163 }
164 Err(e) => Err(e),
165 }
166 }
167
168 pub async fn request<B: Serialize, T: DeserializeOwned>(
170 &self,
171 method: Method,
172 path: &str,
173 body: Option<B>,
174 version: Option<u8>,
175 ) -> Result<T, AppError> {
176 match self
177 .request_internal(method.clone(), path, &body, version)
178 .await
179 {
180 Ok(response) => self.parse_response(response).await,
181 Err(AppError::OAuthTokenExpired) => {
182 warn!("OAuth token expired, refreshing and retrying");
183 self.auth.refresh_token().await?;
184 let response = self.request_internal(method, path, &body, version).await?;
185 self.parse_response(response).await
186 }
187 Err(e) => Err(e),
188 }
189 }
190
191 async fn request_internal<B: Serialize>(
193 &self,
194 method: Method,
195 path: &str,
196 body: &Option<B>,
197 version: Option<u8>,
198 ) -> Result<Response, AppError> {
199 let session = self.auth.get_session().await?;
200
201 let url = if path.starts_with("http") {
202 path.to_string()
203 } else {
204 let path = path.trim_start_matches('/');
205 format!("{}/{}", self.config.rest_api.base_url, path)
206 };
207
208 let api_key = self.config.credentials.api_key.clone();
209 let version_owned = version.unwrap_or(1).to_string();
210 let auth_header_value;
211 let account_id;
212 let cst;
213 let x_security_token;
214
215 let mut headers = vec![
216 ("X-IG-API-KEY", api_key.as_str()),
217 ("Content-Type", "application/json; charset=UTF-8"),
218 ("Accept", "application/json; charset=UTF-8"),
219 ("Version", version_owned.as_str()),
220 ];
221
222 if let Some(oauth) = &session.oauth_token {
223 auth_header_value = format!("Bearer {}", oauth.access_token);
224 account_id = session.account_id.clone();
225 headers.push(("Authorization", auth_header_value.as_str()));
226 headers.push(("IG-ACCOUNT-ID", account_id.as_str()));
227 } else if let (Some(cst_val), Some(token_val)) = (&session.cst, &session.x_security_token) {
228 cst = cst_val.clone();
229 x_security_token = token_val.clone();
230 headers.push(("CST", cst.as_str()));
231 headers.push(("X-SECURITY-TOKEN", x_security_token.as_str()));
232 }
233
234 make_http_request(
235 &self.http_client,
236 self.rate_limiter.clone(),
237 method,
238 &url,
239 headers,
240 body,
241 RetryConfig::infinite(),
242 )
243 .await
244 }
245
246 async fn request_internal_with_delete_method<B: Serialize>(
250 &self,
251 path: &str,
252 body: &B,
253 version: Option<u8>,
254 ) -> Result<Response, AppError> {
255 let session = self.auth.get_session().await?;
256
257 let url = if path.starts_with("http") {
258 path.to_string()
259 } else {
260 let path = path.trim_start_matches('/');
261 format!("{}/{}", self.config.rest_api.base_url, path)
262 };
263
264 let api_key = self.config.credentials.api_key.clone();
265 let version_owned = version.unwrap_or(1).to_string();
266 let auth_header_value;
267 let account_id;
268 let cst;
269 let x_security_token;
270
271 let mut headers = vec![
272 ("X-IG-API-KEY", api_key.as_str()),
273 ("Content-Type", "application/json; charset=UTF-8"),
274 ("Accept", "application/json; charset=UTF-8"),
275 ("Version", version_owned.as_str()),
276 ("_method", "DELETE"), ];
278
279 if let Some(oauth) = &session.oauth_token {
280 auth_header_value = format!("Bearer {}", oauth.access_token);
281 account_id = session.account_id.clone();
282 headers.push(("Authorization", auth_header_value.as_str()));
283 headers.push(("IG-ACCOUNT-ID", account_id.as_str()));
284 } else if let (Some(cst_val), Some(token_val)) = (&session.cst, &session.x_security_token) {
285 cst = cst_val.clone();
286 x_security_token = token_val.clone();
287 headers.push(("CST", cst.as_str()));
288 headers.push(("X-SECURITY-TOKEN", x_security_token.as_str()));
289 }
290
291 make_http_request(
292 &self.http_client,
293 self.rate_limiter.clone(),
294 Method::POST, &url,
296 headers,
297 &Some(body),
298 RetryConfig::infinite(),
299 )
300 .await
301 }
302
303 async fn parse_response<T: DeserializeOwned>(&self, response: Response) -> Result<T, AppError> {
305 Ok(response.json().await?)
306 }
307
308 pub async fn switch_account(
310 &self,
311 account_id: &str,
312 default_account: Option<bool>,
313 ) -> Result<(), AppError> {
314 self.auth
315 .switch_account(account_id, default_account)
316 .await?;
317 Ok(())
318 }
319
320 pub async fn get_session(&self) -> Result<Session, AppError> {
322 self.auth.get_session().await
323 }
324
325 pub async fn logout(&self) -> Result<(), AppError> {
327 self.auth.logout().await
328 }
329
330 pub fn auth(&self) -> &Auth {
332 &self.auth
333 }
334}
335
336impl Default for HttpClient {
337 fn default() -> Self {
338 let config = Config::default();
339 Self::new_lazy(config)
340 }
341}
342
343pub async fn make_http_request<B: Serialize>(
423 client: &Client,
424 rate_limiter: Arc<RwLock<RateLimiter>>,
425 method: Method,
426 url: &str,
427 headers: Vec<(&str, &str)>,
428 body: &Option<B>,
429 retry_config: RetryConfig,
430) -> Result<Response, AppError> {
431 let mut retry_count = 0;
432 let max_retries = retry_config.max_retries();
433 let delay_secs = retry_config.delay_secs();
434
435 loop {
436 {
438 let limiter = rate_limiter.read().await;
439 limiter.wait().await;
440 }
441
442 debug!("{} {}", method, url);
443
444 let mut request = client.request(method.clone(), url);
446
447 for (name, value) in &headers {
449 request = request.header(*name, *value);
450 }
451
452 if let Some(b) = body {
454 request = request.json(b);
455 }
456
457 let response = request.send().await?;
459 let status = response.status();
460 debug!("Response status: {}", status);
461
462 if status.is_success() {
463 return Ok(response);
464 }
465
466 match status {
467 StatusCode::FORBIDDEN => {
468 let body_text = response.text().await.unwrap_or_default();
469 if body_text.contains("exceeded-api-key-allowance")
470 || body_text.contains("exceeded-account-allowance")
471 || body_text.contains("exceeded-account-trading-allowance")
472 || body_text.contains("exceeded-account-historical-data-allowance")
473 {
474 retry_count += 1;
475
476 if max_retries > 0 && retry_count > max_retries {
478 error!(
479 "Rate limit exceeded after {} attempts. Max retries ({}) reached.",
480 retry_count - 1,
481 max_retries
482 );
483 return Err(AppError::RateLimitExceeded);
484 }
485
486 warn!(
487 "Rate limit exceeded (attempt {}): {}. Waiting {} seconds before retry...",
488 retry_count, body_text, delay_secs
489 );
490 tokio::time::sleep(tokio::time::Duration::from_secs(delay_secs)).await;
491 continue; }
493 error!("Forbidden: {}", body_text);
494 return Err(AppError::Unexpected(status));
495 }
496 StatusCode::UNAUTHORIZED => {
497 let body_text = response.text().await.unwrap_or_default();
498 if body_text.contains("oauth-token-invalid") {
499 return Err(AppError::OAuthTokenExpired);
500 }
501 error!("Unauthorized: {}", body_text);
502 return Err(AppError::Unauthorized);
503 }
504 _ => {
505 let body = response.text().await.unwrap_or_default();
506 error!("Request failed with status {}: {}", status, body);
507 return Err(AppError::Unexpected(status));
508 }
509 }
510 }
511}