1use crate::application::auth::{Auth, Session, WebsocketInfo};
8use crate::application::config::Config;
9use crate::application::rate_limiter::RateLimiter;
10use crate::error::AppError;
11use crate::model::retry::RetryConfig;
12use reqwest::Client as HttpInternalClient;
13use reqwest::{Client, Method, Response, StatusCode};
14use serde::Serialize;
15use serde::de::DeserializeOwned;
16use std::sync::Arc;
17use tokio::sync::RwLock;
18use tracing::{debug, error, warn};
19
20const USER_AGENT: &str = "ig-client/0.6.0";
21
22pub struct HttpClient {
31 auth: Arc<Auth>,
32 http_client: HttpInternalClient,
33 config: Arc<Config>,
34 rate_limiter: Arc<RwLock<RateLimiter>>,
35}
36
37impl HttpClient {
38 pub async fn new(config: Config) -> Result<Self, AppError> {
47 let config = Arc::new(config);
48
49 let http_client = HttpInternalClient::builder()
51 .user_agent(USER_AGENT)
52 .build()?;
53 let rate_limiter = Arc::new(RwLock::new(RateLimiter::new(&config.rate_limiter)));
54
55 let auth = Arc::new(Auth::new(config.clone()));
57
58 auth.login().await?;
60
61 Ok(Self {
62 auth,
63 http_client,
64 config,
65 rate_limiter,
66 })
67 }
68
69 pub fn new_lazy(config: Config) -> Result<Self, AppError> {
74 let config = Arc::new(config);
75
76 let http_client = HttpInternalClient::builder()
78 .user_agent(USER_AGENT)
79 .build()?;
80 let rate_limiter = Arc::new(RwLock::new(RateLimiter::new(&config.rate_limiter)));
81
82 let auth = Arc::new(Auth::new(config.clone()));
84
85 Ok(Self {
86 auth,
87 http_client,
88 config,
89 rate_limiter,
90 })
91 }
92
93 pub async fn get_ws_info(&self) -> WebsocketInfo {
98 self.auth.get_ws_info().await
99 }
100
101 pub async fn get<T: DeserializeOwned>(
103 &self,
104 path: &str,
105 version: Option<u8>,
106 ) -> Result<T, AppError> {
107 self.request(Method::GET, path, None::<()>, version).await
108 }
109
110 pub async fn post<B: Serialize, T: DeserializeOwned>(
112 &self,
113 path: &str,
114 body: B,
115 version: Option<u8>,
116 ) -> Result<T, AppError> {
117 self.request(Method::POST, path, Some(body), version).await
118 }
119
120 pub async fn put<B: Serialize, T: DeserializeOwned>(
122 &self,
123 path: &str,
124 body: B,
125 version: Option<u8>,
126 ) -> Result<T, AppError> {
127 self.request(Method::PUT, path, Some(body), version).await
128 }
129
130 pub async fn delete<T: DeserializeOwned>(
132 &self,
133 path: &str,
134 version: Option<u8>,
135 ) -> Result<T, AppError> {
136 self.request(Method::DELETE, path, None::<()>, version)
137 .await
138 }
139
140 pub async fn post_with_delete_method<B: Serialize, T: DeserializeOwned>(
153 &self,
154 path: &str,
155 body: B,
156 version: Option<u8>,
157 ) -> Result<T, AppError> {
158 match self
159 .request_internal_with_delete_method(path, &body, version)
160 .await
161 {
162 Ok(response) => self.parse_response(response).await,
163 Err(AppError::OAuthTokenExpired) => {
164 warn!("OAuth token expired, refreshing and retrying");
165 self.auth.refresh_token().await?;
166 let response = self
167 .request_internal_with_delete_method(path, &body, version)
168 .await?;
169 self.parse_response(response).await
170 }
171 Err(e) => Err(e),
172 }
173 }
174
175 pub async fn request<B: Serialize, T: DeserializeOwned>(
177 &self,
178 method: Method,
179 path: &str,
180 body: Option<B>,
181 version: Option<u8>,
182 ) -> Result<T, AppError> {
183 match self
184 .request_internal(method.clone(), path, &body, version)
185 .await
186 {
187 Ok(response) => self.parse_response(response).await,
188 Err(AppError::OAuthTokenExpired) => {
189 warn!("OAuth token expired, refreshing and retrying");
190 self.auth.refresh_token().await?;
191 let response = self.request_internal(method, path, &body, version).await?;
192 self.parse_response(response).await
193 }
194 Err(e) => Err(e),
195 }
196 }
197
198 async fn request_internal<B: Serialize>(
200 &self,
201 method: Method,
202 path: &str,
203 body: &Option<B>,
204 version: Option<u8>,
205 ) -> Result<Response, AppError> {
206 let session = self.auth.get_session().await?;
207
208 let url = if path.starts_with("http") {
209 path.to_string()
210 } else {
211 let path = path.trim_start_matches('/');
212 format!("{}/{}", self.config.rest_api.base_url, path)
213 };
214
215 let api_key = self.config.credentials.api_key.clone();
216 let version_owned = version.unwrap_or(1).to_string();
217 let auth_header_value;
218 let account_id;
219 let cst;
220 let x_security_token;
221
222 let mut headers = vec![
223 ("X-IG-API-KEY", api_key.as_str()),
224 ("Content-Type", "application/json; charset=UTF-8"),
225 ("Accept", "application/json; charset=UTF-8"),
226 ("Version", version_owned.as_str()),
227 ];
228
229 if let Some(oauth) = &session.oauth_token {
230 auth_header_value = format!("Bearer {}", oauth.access_token);
231 account_id = session.account_id.clone();
232 headers.push(("Authorization", auth_header_value.as_str()));
233 headers.push(("IG-ACCOUNT-ID", account_id.as_str()));
234 } else if let (Some(cst_val), Some(token_val)) = (&session.cst, &session.x_security_token) {
235 cst = cst_val.clone();
236 x_security_token = token_val.clone();
237 headers.push(("CST", cst.as_str()));
238 headers.push(("X-SECURITY-TOKEN", x_security_token.as_str()));
239 }
240
241 make_http_request(
242 &self.http_client,
243 self.rate_limiter.clone(),
244 method,
245 &url,
246 headers,
247 body,
248 RetryConfig::infinite(),
249 )
250 .await
251 }
252
253 async fn request_internal_with_delete_method<B: Serialize>(
257 &self,
258 path: &str,
259 body: &B,
260 version: Option<u8>,
261 ) -> Result<Response, AppError> {
262 let session = self.auth.get_session().await?;
263
264 let url = if path.starts_with("http") {
265 path.to_string()
266 } else {
267 let path = path.trim_start_matches('/');
268 format!("{}/{}", self.config.rest_api.base_url, path)
269 };
270
271 let api_key = self.config.credentials.api_key.clone();
272 let version_owned = version.unwrap_or(1).to_string();
273 let auth_header_value;
274 let account_id;
275 let cst;
276 let x_security_token;
277
278 let mut headers = vec![
279 ("X-IG-API-KEY", api_key.as_str()),
280 ("Content-Type", "application/json; charset=UTF-8"),
281 ("Accept", "application/json; charset=UTF-8"),
282 ("Version", version_owned.as_str()),
283 ("_method", "DELETE"), ];
285
286 if let Some(oauth) = &session.oauth_token {
287 auth_header_value = format!("Bearer {}", oauth.access_token);
288 account_id = session.account_id.clone();
289 headers.push(("Authorization", auth_header_value.as_str()));
290 headers.push(("IG-ACCOUNT-ID", account_id.as_str()));
291 } else if let (Some(cst_val), Some(token_val)) = (&session.cst, &session.x_security_token) {
292 cst = cst_val.clone();
293 x_security_token = token_val.clone();
294 headers.push(("CST", cst.as_str()));
295 headers.push(("X-SECURITY-TOKEN", x_security_token.as_str()));
296 }
297
298 make_http_request(
299 &self.http_client,
300 self.rate_limiter.clone(),
301 Method::POST, &url,
303 headers,
304 &Some(body),
305 RetryConfig::infinite(),
306 )
307 .await
308 }
309
310 async fn parse_response<T: DeserializeOwned>(&self, response: Response) -> Result<T, AppError> {
312 Ok(response.json().await?)
313 }
314
315 pub async fn switch_account(
317 &self,
318 account_id: &str,
319 default_account: Option<bool>,
320 ) -> Result<(), AppError> {
321 self.auth
322 .switch_account(account_id, default_account)
323 .await?;
324 Ok(())
325 }
326
327 pub async fn get_session(&self) -> Result<Session, AppError> {
329 self.auth.get_session().await
330 }
331
332 pub async fn logout(&self) -> Result<(), AppError> {
334 self.auth.logout().await
335 }
336
337 pub fn auth(&self) -> &Auth {
339 &self.auth
340 }
341}
342
343impl Default for HttpClient {
344 fn default() -> Self {
345 let config = Config::default();
346 Self::new_lazy(config).expect("failed to create default HTTP client")
349 }
350}
351
352pub async fn make_http_request<B: Serialize>(
432 client: &Client,
433 rate_limiter: Arc<RwLock<RateLimiter>>,
434 method: Method,
435 url: &str,
436 headers: Vec<(&str, &str)>,
437 body: &Option<B>,
438 retry_config: RetryConfig,
439) -> Result<Response, AppError> {
440 let mut retry_count = 0;
441 let max_retries = retry_config.max_retries();
442 let delay_secs = retry_config.delay_secs();
443
444 loop {
445 {
447 let limiter = rate_limiter.read().await;
448 limiter.wait().await;
449 }
450
451 debug!("{} {}", method, url);
452
453 let mut request = client.request(method.clone(), url);
455
456 for (name, value) in &headers {
458 request = request.header(*name, *value);
459 }
460
461 if let Some(b) = body {
463 request = request.json(b);
464 }
465
466 let response = request.send().await?;
468 let status = response.status();
469 debug!("Response status: {}", status);
470
471 if status.is_success() {
472 return Ok(response);
473 }
474
475 match status {
476 StatusCode::FORBIDDEN => {
477 let body_text = response.text().await.unwrap_or_default();
478 if body_text.contains("exceeded-api-key-allowance")
479 || body_text.contains("exceeded-account-allowance")
480 || body_text.contains("exceeded-account-trading-allowance")
481 || body_text.contains("exceeded-account-historical-data-allowance")
482 {
483 retry_count += 1;
484
485 if max_retries > 0 && retry_count > max_retries {
487 error!(
488 "Rate limit exceeded after {} attempts. Max retries ({}) reached.",
489 retry_count - 1,
490 max_retries
491 );
492 return Err(AppError::RateLimitExceeded);
493 }
494
495 warn!(
496 "Rate limit exceeded (attempt {}): {}. Waiting {} seconds before retry...",
497 retry_count, body_text, delay_secs
498 );
499 tokio::time::sleep(tokio::time::Duration::from_secs(delay_secs)).await;
500 continue; }
502 error!("Forbidden: {}", body_text);
503 return Err(AppError::Unexpected(status));
504 }
505 StatusCode::UNAUTHORIZED => {
506 let body_text = response.text().await.unwrap_or_default();
507 if body_text.contains("oauth-token-invalid") {
508 return Err(AppError::OAuthTokenExpired);
509 }
510 error!("Unauthorized: {}", body_text);
511 return Err(AppError::Unauthorized);
512 }
513 _ => {
514 let body = response.text().await.unwrap_or_default();
515 error!("Request failed with status {}: {}", status, body);
516 return Err(AppError::Unexpected(status));
517 }
518 }
519 }
520}