1use crate::application::auth::{Auth, Session, WebsocketInfo};
8use crate::application::config::Config;
9use crate::application::rate_limiter::RateLimiter;
10use crate::error::AppError;
11use crate::model::retry::RetryConfig;
12use reqwest::Client as HttpInternalClient;
13use reqwest::{Client, Method, Response, StatusCode};
14use serde::Serialize;
15use serde::de::DeserializeOwned;
16use std::sync::Arc;
17use tokio::sync::RwLock;
18use tracing::{debug, error, warn};
19
20const USER_AGENT: &str = "ig-client/0.6.0";
21
22pub struct HttpClient {
31 auth: Arc<Auth>,
32 http_client: HttpInternalClient,
33 config: Arc<Config>,
34 rate_limiter: Arc<RwLock<RateLimiter>>,
35}
36
37impl HttpClient {
38 pub async fn new(config: Config) -> Result<Self, AppError> {
47 let config = Arc::new(config);
48
49 let http_client = HttpInternalClient::builder()
51 .user_agent(USER_AGENT)
52 .build()?;
53 let rate_limiter = Arc::new(RwLock::new(RateLimiter::new(&config.rate_limiter)));
54
55 let auth = Arc::new(Auth::new(config.clone()));
57
58 auth.login().await?;
60
61 Ok(Self {
62 auth,
63 http_client,
64 config,
65 rate_limiter,
66 })
67 }
68
69 pub fn new_lazy(config: Config) -> Self {
71 let config = Arc::new(config);
72
73 let http_client = HttpInternalClient::builder()
75 .user_agent(USER_AGENT)
76 .build()
77 .expect("Failed to create HTTP client");
78 let rate_limiter = Arc::new(RwLock::new(RateLimiter::new(&config.rate_limiter)));
79
80 let auth = Arc::new(Auth::new(config.clone()));
82
83 Self {
84 auth,
85 http_client,
86 config,
87 rate_limiter,
88 }
89 }
90
91 pub async fn get_ws_info(&self) -> WebsocketInfo {
96 self.auth.get_ws_info().await
97 }
98
99 pub async fn get<T: DeserializeOwned>(
101 &self,
102 path: &str,
103 version: Option<u8>,
104 ) -> Result<T, AppError> {
105 self.request(Method::GET, path, None::<()>, version).await
106 }
107
108 pub async fn post<B: Serialize, T: DeserializeOwned>(
110 &self,
111 path: &str,
112 body: B,
113 version: Option<u8>,
114 ) -> Result<T, AppError> {
115 self.request(Method::POST, path, Some(body), version).await
116 }
117
118 pub async fn put<B: Serialize, T: DeserializeOwned>(
120 &self,
121 path: &str,
122 body: B,
123 version: Option<u8>,
124 ) -> Result<T, AppError> {
125 self.request(Method::PUT, path, Some(body), version).await
126 }
127
128 pub async fn delete<T: DeserializeOwned>(
130 &self,
131 path: &str,
132 version: Option<u8>,
133 ) -> Result<T, AppError> {
134 self.request(Method::DELETE, path, None::<()>, version)
135 .await
136 }
137
138 pub async fn post_with_delete_method<B: Serialize, T: DeserializeOwned>(
151 &self,
152 path: &str,
153 body: B,
154 version: Option<u8>,
155 ) -> Result<T, AppError> {
156 match self
157 .request_internal_with_delete_method(path, &body, version)
158 .await
159 {
160 Ok(response) => self.parse_response(response).await,
161 Err(AppError::OAuthTokenExpired) => {
162 warn!("OAuth token expired, refreshing and retrying");
163 self.auth.refresh_token().await?;
164 let response = self
165 .request_internal_with_delete_method(path, &body, version)
166 .await?;
167 self.parse_response(response).await
168 }
169 Err(e) => Err(e),
170 }
171 }
172
173 pub async fn request<B: Serialize, T: DeserializeOwned>(
175 &self,
176 method: Method,
177 path: &str,
178 body: Option<B>,
179 version: Option<u8>,
180 ) -> Result<T, AppError> {
181 match self
182 .request_internal(method.clone(), path, &body, version)
183 .await
184 {
185 Ok(response) => self.parse_response(response).await,
186 Err(AppError::OAuthTokenExpired) => {
187 warn!("OAuth token expired, refreshing and retrying");
188 self.auth.refresh_token().await?;
189 let response = self.request_internal(method, path, &body, version).await?;
190 self.parse_response(response).await
191 }
192 Err(e) => Err(e),
193 }
194 }
195
196 async fn request_internal<B: Serialize>(
198 &self,
199 method: Method,
200 path: &str,
201 body: &Option<B>,
202 version: Option<u8>,
203 ) -> Result<Response, AppError> {
204 let session = self.auth.get_session().await?;
205
206 let url = if path.starts_with("http") {
207 path.to_string()
208 } else {
209 let path = path.trim_start_matches('/');
210 format!("{}/{}", self.config.rest_api.base_url, path)
211 };
212
213 let api_key = self.config.credentials.api_key.clone();
214 let version_owned = version.unwrap_or(1).to_string();
215 let auth_header_value;
216 let account_id;
217 let cst;
218 let x_security_token;
219
220 let mut headers = vec![
221 ("X-IG-API-KEY", api_key.as_str()),
222 ("Content-Type", "application/json; charset=UTF-8"),
223 ("Accept", "application/json; charset=UTF-8"),
224 ("Version", version_owned.as_str()),
225 ];
226
227 if let Some(oauth) = &session.oauth_token {
228 auth_header_value = format!("Bearer {}", oauth.access_token);
229 account_id = session.account_id.clone();
230 headers.push(("Authorization", auth_header_value.as_str()));
231 headers.push(("IG-ACCOUNT-ID", account_id.as_str()));
232 } else if let (Some(cst_val), Some(token_val)) = (&session.cst, &session.x_security_token) {
233 cst = cst_val.clone();
234 x_security_token = token_val.clone();
235 headers.push(("CST", cst.as_str()));
236 headers.push(("X-SECURITY-TOKEN", x_security_token.as_str()));
237 }
238
239 make_http_request(
240 &self.http_client,
241 self.rate_limiter.clone(),
242 method,
243 &url,
244 headers,
245 body,
246 RetryConfig::infinite(),
247 )
248 .await
249 }
250
251 async fn request_internal_with_delete_method<B: Serialize>(
255 &self,
256 path: &str,
257 body: &B,
258 version: Option<u8>,
259 ) -> Result<Response, AppError> {
260 let session = self.auth.get_session().await?;
261
262 let url = if path.starts_with("http") {
263 path.to_string()
264 } else {
265 let path = path.trim_start_matches('/');
266 format!("{}/{}", self.config.rest_api.base_url, path)
267 };
268
269 let api_key = self.config.credentials.api_key.clone();
270 let version_owned = version.unwrap_or(1).to_string();
271 let auth_header_value;
272 let account_id;
273 let cst;
274 let x_security_token;
275
276 let mut headers = vec![
277 ("X-IG-API-KEY", api_key.as_str()),
278 ("Content-Type", "application/json; charset=UTF-8"),
279 ("Accept", "application/json; charset=UTF-8"),
280 ("Version", version_owned.as_str()),
281 ("_method", "DELETE"), ];
283
284 if let Some(oauth) = &session.oauth_token {
285 auth_header_value = format!("Bearer {}", oauth.access_token);
286 account_id = session.account_id.clone();
287 headers.push(("Authorization", auth_header_value.as_str()));
288 headers.push(("IG-ACCOUNT-ID", account_id.as_str()));
289 } else if let (Some(cst_val), Some(token_val)) = (&session.cst, &session.x_security_token) {
290 cst = cst_val.clone();
291 x_security_token = token_val.clone();
292 headers.push(("CST", cst.as_str()));
293 headers.push(("X-SECURITY-TOKEN", x_security_token.as_str()));
294 }
295
296 make_http_request(
297 &self.http_client,
298 self.rate_limiter.clone(),
299 Method::POST, &url,
301 headers,
302 &Some(body),
303 RetryConfig::infinite(),
304 )
305 .await
306 }
307
308 async fn parse_response<T: DeserializeOwned>(&self, response: Response) -> Result<T, AppError> {
310 Ok(response.json().await?)
311 }
312
313 pub async fn switch_account(
315 &self,
316 account_id: &str,
317 default_account: Option<bool>,
318 ) -> Result<(), AppError> {
319 self.auth
320 .switch_account(account_id, default_account)
321 .await?;
322 Ok(())
323 }
324
325 pub async fn get_session(&self) -> Result<Session, AppError> {
327 self.auth.get_session().await
328 }
329
330 pub async fn logout(&self) -> Result<(), AppError> {
332 self.auth.logout().await
333 }
334
335 pub fn auth(&self) -> &Auth {
337 &self.auth
338 }
339}
340
341impl Default for HttpClient {
342 fn default() -> Self {
343 let config = Config::default();
344 Self::new_lazy(config)
345 }
346}
347
348pub async fn make_http_request<B: Serialize>(
428 client: &Client,
429 rate_limiter: Arc<RwLock<RateLimiter>>,
430 method: Method,
431 url: &str,
432 headers: Vec<(&str, &str)>,
433 body: &Option<B>,
434 retry_config: RetryConfig,
435) -> Result<Response, AppError> {
436 let mut retry_count = 0;
437 let max_retries = retry_config.max_retries();
438 let delay_secs = retry_config.delay_secs();
439
440 loop {
441 {
443 let limiter = rate_limiter.read().await;
444 limiter.wait().await;
445 }
446
447 debug!("{} {}", method, url);
448
449 let mut request = client.request(method.clone(), url);
451
452 for (name, value) in &headers {
454 request = request.header(*name, *value);
455 }
456
457 if let Some(b) = body {
459 request = request.json(b);
460 }
461
462 let response = request.send().await?;
464 let status = response.status();
465 debug!("Response status: {}", status);
466
467 if status.is_success() {
468 return Ok(response);
469 }
470
471 match status {
472 StatusCode::FORBIDDEN => {
473 let body_text = response.text().await.unwrap_or_default();
474 if body_text.contains("exceeded-api-key-allowance")
475 || body_text.contains("exceeded-account-allowance")
476 || body_text.contains("exceeded-account-trading-allowance")
477 || body_text.contains("exceeded-account-historical-data-allowance")
478 {
479 retry_count += 1;
480
481 if max_retries > 0 && retry_count > max_retries {
483 error!(
484 "Rate limit exceeded after {} attempts. Max retries ({}) reached.",
485 retry_count - 1,
486 max_retries
487 );
488 return Err(AppError::RateLimitExceeded);
489 }
490
491 warn!(
492 "Rate limit exceeded (attempt {}): {}. Waiting {} seconds before retry...",
493 retry_count, body_text, delay_secs
494 );
495 tokio::time::sleep(tokio::time::Duration::from_secs(delay_secs)).await;
496 continue; }
498 error!("Forbidden: {}", body_text);
499 return Err(AppError::Unexpected(status));
500 }
501 StatusCode::UNAUTHORIZED => {
502 let body_text = response.text().await.unwrap_or_default();
503 if body_text.contains("oauth-token-invalid") {
504 return Err(AppError::OAuthTokenExpired);
505 }
506 error!("Unauthorized: {}", body_text);
507 return Err(AppError::Unauthorized);
508 }
509 _ => {
510 let body = response.text().await.unwrap_or_default();
511 error!("Request failed with status {}: {}", status, body);
512 return Err(AppError::Unexpected(status));
513 }
514 }
515 }
516}