1use crate::constants::USER_AGENT;
4use crate::{
5 config::Config,
6 error::AuthError,
7 session::interface::{IgAuthenticator, IgSession},
8 session::response::{AccountSwitchRequest, AccountSwitchResponse, SessionResp, SessionV3Resp},
9 utils::rate_limiter::app_non_trading_limiter,
10};
11use async_trait::async_trait;
12use rand;
13use reqwest::{Client, StatusCode};
14use std::time::Duration;
15use tracing::{debug, error, info, trace, warn};
16
17pub struct IgAuth<'a> {
19 pub(crate) cfg: &'a Config,
20 http: Client,
21}
22
23impl<'a> IgAuth<'a> {
24 pub fn new(cfg: &'a Config) -> Self {
32 Self {
33 cfg,
34 http: Client::builder()
35 .user_agent(USER_AGENT)
36 .build()
37 .expect("reqwest client"),
38 }
39 }
40
41 fn rest_url(&self, path: &str) -> String {
43 format!(
44 "{}/{}",
45 self.cfg.rest_api.base_url.trim_end_matches('/'),
46 path.trim_start_matches('/')
47 )
48 }
49
50 #[allow(dead_code)]
61 fn get_client(&self) -> &Client {
62 &self.http
63 }
64
65 async fn refresh_oauth(
75 &self,
76 _session: &IgSession,
77 refresh_token: &str,
78 ) -> Result<IgSession, AuthError> {
79 let url = self.rest_url("session/refresh-token");
80 let api_key = self.cfg.credentials.api_key.trim();
81
82 debug!("OAuth token refresh request to URL: {}", url);
83 debug!("Using API key (length): {}", api_key.len());
84 debug!("Using refresh token (length): {}", refresh_token.len());
85
86 let body = serde_json::json!({
88 "refresh_token": refresh_token
89 });
90
91 let client = Client::builder()
93 .user_agent(USER_AGENT)
94 .build()
95 .expect("reqwest client");
96
97 let resp = match client
99 .post(url.clone())
100 .header("X-IG-API-KEY", api_key)
101 .header("Content-Type", "application/json")
102 .header("Version", "1")
103 .json(&body)
104 .send()
105 .await
106 {
107 Ok(resp) => resp,
108 Err(e) => {
109 error!("Failed to send OAuth refresh request: {}", e);
110 return Err(AuthError::Unexpected(StatusCode::INTERNAL_SERVER_ERROR));
111 }
112 };
113
114 debug!("OAuth refresh response status: {}", resp.status());
115 trace!("Response headers: {:#?}", resp.headers());
116
117 match resp.status() {
118 StatusCode::OK => {
119 let json: SessionV3Resp = resp.json().await?;
121
122 debug!("Successfully refreshed OAuth token");
123 debug!("Account ID: {}", json.account_id);
124 debug!(
125 "New access token length: {}",
126 json.oauth_token.access_token.len()
127 );
128 debug!("Token expires in: {} seconds", json.oauth_token.expires_in);
129
130 let new_session = IgSession::from_oauth(
132 json.oauth_token,
133 json.account_id,
134 json.client_id,
135 json.lightstreamer_endpoint,
136 self.cfg,
137 );
138
139 Ok(new_session)
140 }
141 StatusCode::UNAUTHORIZED => {
142 error!("OAuth refresh failed with UNAUTHORIZED");
143 let body = resp
144 .text()
145 .await
146 .unwrap_or_else(|_| "Could not read response body".to_string());
147 error!("Response body: {}", body);
148 Err(AuthError::BadCredentials)
149 }
150 other => {
151 error!("OAuth refresh failed with status: {}", other);
152 let body = resp
153 .text()
154 .await
155 .unwrap_or_else(|_| "Could not read response body".to_string());
156 error!("Response body: {}", body);
157 Err(AuthError::Unexpected(other))
158 }
159 }
160 }
161}
162
163#[async_trait]
164impl IgAuthenticator for IgAuth<'_> {
165 async fn login(&self) -> Result<IgSession, AuthError> {
166 let api_version = self.cfg.api_version.unwrap_or(3);
169
170 debug!("Using API version {} for authentication", api_version);
171
172 match api_version {
173 2 => self.login_v2().await,
174 3 => self.login_v3().await,
175 _ => {
176 error!("Invalid API version: {}. Must be 2 or 3", api_version);
177 Err(AuthError::Unexpected(StatusCode::BAD_REQUEST))
178 }
179 }
180 }
181
182 async fn login_v2(&self) -> Result<IgSession, AuthError> {
183 const MAX_RETRIES: u32 = 3;
185 const INITIAL_RETRY_DELAY_MS: u64 = 10000; let mut retry_count = 0;
188 let mut retry_delay_ms = INITIAL_RETRY_DELAY_MS;
189
190 loop {
191 let limiter = app_non_trading_limiter();
193 limiter.wait().await;
194
195 let url = self.rest_url("session");
197
198 let api_key = self.cfg.credentials.api_key.trim();
200 let username = self.cfg.credentials.username.trim();
201 let password = self.cfg.credentials.password.trim();
202
203 debug!("Login v2 request to URL: {}", url);
205 debug!("Using API key (length): {}", api_key.len());
206 debug!("Using username: {}", username);
207
208 if retry_count > 0 {
209 debug!("Retry attempt {} of {}", retry_count, MAX_RETRIES);
210 }
211
212 let body = serde_json::json!({
214 "identifier": username,
215 "password": password,
216 "encryptedPassword": false
217 });
218
219 debug!(
220 "Request body: {}",
221 serde_json::to_string(&body).unwrap_or_default()
222 );
223
224 let client = Client::builder()
226 .user_agent(USER_AGENT)
227 .build()
228 .expect("reqwest client");
229
230 let resp = match client
232 .post(url.clone())
233 .header("X-IG-API-KEY", api_key)
234 .header("Content-Type", "application/json; charset=UTF-8")
235 .header("Accept", "application/json; charset=UTF-8")
236 .header("Version", "2")
237 .json(&body)
238 .send()
239 .await
240 {
241 Ok(resp) => resp,
242 Err(e) => {
243 error!("Failed to send login request: {}", e);
244 return Err(AuthError::Unexpected(StatusCode::INTERNAL_SERVER_ERROR));
245 }
246 };
247
248 debug!("Login v2 response status: {}", resp.status());
250 trace!("Response headers: {:#?}", resp.headers());
251
252 match resp.status() {
253 StatusCode::OK => {
254 let cst = match resp.headers().get("CST") {
256 Some(value) => {
257 let cst_str = value
258 .to_str()
259 .map_err(|_| AuthError::Unexpected(StatusCode::OK))?;
260 debug!(
261 "Successfully obtained CST token of length: {}",
262 cst_str.len()
263 );
264 cst_str.to_owned()
265 }
266 None => {
267 error!("CST header not found in response");
268 return Err(AuthError::Unexpected(StatusCode::OK));
269 }
270 };
271
272 let token = match resp.headers().get("X-SECURITY-TOKEN") {
273 Some(value) => {
274 let token_str = value
275 .to_str()
276 .map_err(|_| AuthError::Unexpected(StatusCode::OK))?;
277 debug!(
278 "Successfully obtained X-SECURITY-TOKEN of length: {}",
279 token_str.len()
280 );
281 token_str.to_owned()
282 }
283 None => {
284 error!("X-SECURITY-TOKEN header not found in response");
285 return Err(AuthError::Unexpected(StatusCode::OK));
286 }
287 };
288
289 let json: SessionResp = resp.json().await?;
291 let account_id = json.account_id.clone();
292
293 let session =
296 IgSession::from_config(cst.clone(), token.clone(), account_id, self.cfg);
297
298 if let Some(stats) = session.get_rate_limit_stats().await {
300 debug!("Rate limiter initialized: {}", stats);
301 }
302
303 return Ok(session);
304 }
305 StatusCode::UNAUTHORIZED => {
306 error!("Authentication failed with UNAUTHORIZED");
307 let body = resp
308 .text()
309 .await
310 .unwrap_or_else(|_| "Could not read response body".to_string());
311 error!("Response body: {}", body);
312 return Err(AuthError::BadCredentials);
313 }
314 StatusCode::FORBIDDEN => {
315 error!("Authentication failed with FORBIDDEN");
316 let body = resp
317 .text()
318 .await
319 .unwrap_or_else(|_| "Could not read response body".to_string());
320
321 if body.contains("exceeded-api-key-allowance") {
322 error!("Rate Limit Exceeded: {}", &body);
323
324 if retry_count < MAX_RETRIES {
326 retry_count += 1;
327 let jitter = rand::random::<u64>() % 5000;
329 let delay = retry_delay_ms + jitter;
330 warn!(
331 "Rate limit exceeded. Retrying in {} ms (attempt {} of {})",
332 delay, retry_count, MAX_RETRIES
333 );
334
335 tokio::time::sleep(Duration::from_millis(delay)).await;
336
337 retry_delay_ms *= 2; continue;
340 } else {
341 error!(
342 "Maximum retry attempts ({}) reached. Giving up.",
343 MAX_RETRIES
344 );
345 return Err(AuthError::RateLimitExceeded);
346 }
347 }
348
349 error!("Response body: {}", body);
350 return Err(AuthError::BadCredentials);
351 }
352 other => {
353 error!("Authentication failed with unexpected status: {}", other);
354 let body = resp
355 .text()
356 .await
357 .unwrap_or_else(|_| "Could not read response body".to_string());
358 error!("Response body: {}", body);
359 return Err(AuthError::Unexpected(other));
360 }
361 }
362 }
363 }
364
365 async fn login_v3(&self) -> Result<IgSession, AuthError> {
366 const MAX_RETRIES: u32 = 3;
368 const INITIAL_RETRY_DELAY_MS: u64 = 10000; let mut retry_count = 0;
371 let mut retry_delay_ms = INITIAL_RETRY_DELAY_MS;
372
373 loop {
374 let limiter = app_non_trading_limiter();
376 limiter.wait().await;
377
378 let url = self.rest_url("session");
379
380 let api_key = self.cfg.credentials.api_key.trim();
382 let username = self.cfg.credentials.username.trim();
383 let password = self.cfg.credentials.password.trim();
384
385 debug!("Login v3 request to URL: {}", url);
387 debug!("Using API key (length): {}", api_key.len());
388 debug!("Using username: {}", username);
389
390 if retry_count > 0 {
391 debug!("Retry attempt {} of {}", retry_count, MAX_RETRIES);
392 }
393
394 let body = serde_json::json!({
396 "identifier": username,
397 "password": password,
398 "encryptedPassword": null
399 });
400
401 debug!(
402 "Request body: {}",
403 serde_json::to_string(&body).unwrap_or_default()
404 );
405
406 let client = Client::builder()
408 .user_agent(USER_AGENT)
409 .build()
410 .expect("reqwest client");
411
412 let resp = match client
414 .post(url.clone())
415 .header("X-IG-API-KEY", api_key)
416 .header("Content-Type", "application/json")
417 .header("Version", "3")
418 .json(&body)
419 .send()
420 .await
421 {
422 Ok(resp) => resp,
423 Err(e) => {
424 error!("Failed to send login v3 request: {}", e);
425 return Err(AuthError::Unexpected(StatusCode::INTERNAL_SERVER_ERROR));
426 }
427 };
428
429 debug!("Login v3 response status: {}", resp.status());
431 trace!("Response headers: {:#?}", resp.headers());
432
433 match resp.status() {
434 StatusCode::OK => {
435 let json: SessionV3Resp = resp.json().await?;
437
438 debug!("Successfully authenticated with OAuth");
439 debug!("Account ID: {}", json.account_id);
440 debug!("Client ID: {}", json.client_id);
441 debug!("Lightstreamer endpoint: {}", json.lightstreamer_endpoint);
442 debug!(
443 "Access token length: {}",
444 json.oauth_token.access_token.len()
445 );
446 debug!("Token expires in: {} seconds", json.oauth_token.expires_in);
447
448 let session = IgSession::from_oauth(
450 json.oauth_token,
451 json.account_id,
452 json.client_id,
453 json.lightstreamer_endpoint,
454 self.cfg,
455 );
456
457 if let Some(stats) = session.get_rate_limit_stats().await {
459 debug!("Rate limiter initialized: {}", stats);
460 }
461
462 return Ok(session);
463 }
464 StatusCode::UNAUTHORIZED => {
465 error!("Authentication failed with UNAUTHORIZED");
466 let body = resp
467 .text()
468 .await
469 .unwrap_or_else(|_| "Could not read response body".to_string());
470 error!("Response body: {}", body);
471 return Err(AuthError::BadCredentials);
472 }
473 StatusCode::FORBIDDEN => {
474 error!("Authentication failed with FORBIDDEN");
475 let body = resp
476 .text()
477 .await
478 .unwrap_or_else(|_| "Could not read response body".to_string());
479
480 if body.contains("exceeded-api-key-allowance") {
481 error!("Rate Limit Exceeded: {}", &body);
482
483 if retry_count < MAX_RETRIES {
484 retry_count += 1;
485 let jitter = rand::random::<u64>() % 5000;
486 let delay = retry_delay_ms + jitter;
487 warn!(
488 "Rate limit exceeded. Retrying in {} ms (attempt {} of {})",
489 delay, retry_count, MAX_RETRIES
490 );
491
492 tokio::time::sleep(Duration::from_millis(delay)).await;
493 retry_delay_ms *= 2;
494 continue;
495 } else {
496 error!(
497 "Maximum retry attempts ({}) reached. Giving up.",
498 MAX_RETRIES
499 );
500 return Err(AuthError::RateLimitExceeded);
501 }
502 }
503
504 error!("Response body: {}", body);
505 return Err(AuthError::BadCredentials);
506 }
507 other => {
508 error!("Authentication failed with unexpected status: {}", other);
509 let body = resp
510 .text()
511 .await
512 .unwrap_or_else(|_| "Could not read response body".to_string());
513 error!("Response body: {}", body);
514 return Err(AuthError::Unexpected(other));
515 }
516 }
517 }
518 }
519
520 async fn refresh(&self, sess: &IgSession) -> Result<IgSession, AuthError> {
522 if let Some(oauth_token) = &sess.oauth_token {
524 return self.refresh_oauth(sess, &oauth_token.refresh_token).await;
526 }
527
528 let url = self.rest_url("session/refresh-token");
530
531 let api_key = self.cfg.credentials.api_key.trim();
533
534 debug!("Refresh request to URL: {}", url);
536 debug!("Using API key (length): {}", api_key.len());
537 debug!("Using CST token (length): {}", sess.cst.len());
538 debug!("Using X-SECURITY-TOKEN (length): {}", sess.token.len());
539
540 let client = Client::builder()
542 .user_agent(USER_AGENT)
543 .build()
544 .expect("reqwest client");
545
546 let resp = client
547 .post(url)
548 .header("X-IG-API-KEY", api_key)
549 .header("CST", &sess.cst)
550 .header("X-SECURITY-TOKEN", &sess.token)
551 .header("Version", "3")
552 .header("Content-Type", "application/json; charset=UTF-8")
553 .header("Accept", "application/json; charset=UTF-8")
554 .send()
555 .await?;
556
557 debug!("Refresh response status: {}", resp.status());
559 trace!("Response headers: {:#?}", resp.headers());
560
561 match resp.status() {
562 StatusCode::OK => {
563 let cst = match resp.headers().get("CST") {
565 Some(value) => {
566 let cst_str = value
567 .to_str()
568 .map_err(|_| AuthError::Unexpected(StatusCode::OK))?;
569 debug!(
570 "Successfully obtained refreshed CST token of length: {}",
571 cst_str.len()
572 );
573 cst_str.to_owned()
574 }
575 None => {
576 error!("CST header not found in refresh response");
577 return Err(AuthError::Unexpected(StatusCode::OK));
578 }
579 };
580
581 let token = match resp.headers().get("X-SECURITY-TOKEN") {
582 Some(value) => {
583 let token_str = value
584 .to_str()
585 .map_err(|_| AuthError::Unexpected(StatusCode::OK))?;
586 debug!(
587 "Successfully obtained refreshed X-SECURITY-TOKEN of length: {}",
588 token_str.len()
589 );
590 token_str.to_owned()
591 }
592 None => {
593 error!("X-SECURITY-TOKEN header not found in refresh response");
594 return Err(AuthError::Unexpected(StatusCode::OK));
595 }
596 };
597
598 let json: SessionResp = resp.json().await?;
600 debug!("Refreshed session for Account ID: {}", json.account_id);
601
602 Ok(IgSession::from_config(
604 cst,
605 token,
606 json.account_id,
607 self.cfg,
608 ))
609 }
610 other => {
611 error!("Session refresh failed with status: {}", other);
612 let body = resp
613 .text()
614 .await
615 .unwrap_or_else(|_| "Could not read response body".to_string());
616 error!("Response body: {}", body);
617 Err(AuthError::Unexpected(other))
618 }
619 }
620 }
621
622 async fn switch_account(
623 &self,
624 session: &IgSession,
625 account_id: &str,
626 default_account: Option<bool>,
627 ) -> Result<IgSession, AuthError> {
628 if session.account_id == account_id {
630 debug!("Already on account ID: {}. No need to switch.", account_id);
631 return Ok(session.clone());
633 }
634
635 let url = self.rest_url("session");
636 let api_key = self.cfg.credentials.api_key.trim();
637
638 debug!("Account switch request to URL: {}", url);
640 debug!("Using API key (length): {}", api_key.len());
641 debug!("Switching to account ID: {}", account_id);
642 debug!("Set as default account: {:?}", default_account);
643
644 let body = AccountSwitchRequest {
646 account_id: account_id.to_string(),
647 default_account,
648 };
649
650 trace!(
651 "Request body: {}",
652 serde_json::to_string(&body).unwrap_or_default()
653 );
654
655 let client = Client::builder()
657 .user_agent(USER_AGENT)
658 .build()
659 .expect("reqwest client");
660
661 let mut request = client
663 .put(url)
664 .header("X-IG-API-KEY", api_key)
665 .header("Version", "1")
666 .header("Content-Type", "application/json; charset=UTF-8")
667 .header("Accept", "application/json; charset=UTF-8");
668
669 if let Some(oauth_token) = &session.oauth_token {
671 debug!("Using OAuth authentication for account switch");
673 request = request
674 .header(
675 "Authorization",
676 format!("Bearer {}", oauth_token.access_token),
677 )
678 .header("IG-ACCOUNT-ID", &session.account_id);
679 } else {
680 debug!("Using CST authentication for account switch");
682 request = request
683 .header("CST", &session.cst)
684 .header("X-SECURITY-TOKEN", &session.token);
685 }
686
687 let resp = request.json(&body).send().await?;
688
689 debug!("Account switch response status: {}", resp.status());
691 trace!("Response headers: {:#?}", resp.headers());
692
693 match resp.status() {
694 StatusCode::OK => {
695 let new_cst = match resp.headers().get("CST") {
702 Some(value) => {
703 let cst_str = value
704 .to_str()
705 .map_err(|_| AuthError::Unexpected(StatusCode::OK))?;
706 debug!(
707 "Successfully obtained new CST token of length: {}",
708 cst_str.len()
709 );
710 cst_str.to_owned()
711 }
712 None => {
713 warn!("CST header not found in switch response, using existing token");
714 session.cst.clone()
715 }
716 };
717
718 let new_token = match resp.headers().get("X-SECURITY-TOKEN") {
719 Some(value) => {
720 let token_str = value
721 .to_str()
722 .map_err(|_| AuthError::Unexpected(StatusCode::OK))?;
723 debug!(
724 "Successfully obtained new X-SECURITY-TOKEN of length: {}",
725 token_str.len()
726 );
727 token_str.to_owned()
728 }
729 None => {
730 warn!(
731 "X-SECURITY-TOKEN header not found in switch response, using existing token"
732 );
733 return Err(AuthError::Unexpected(StatusCode::NO_CONTENT));
734 }
735 };
736
737 let switch_response: AccountSwitchResponse = resp.json().await?;
739 info!("Account switch successful to: {}", account_id);
740 trace!("Account switch response: {:?}", switch_response);
741
742 if session.oauth_token.is_some() {
746 let mut new_session = session.clone();
748 new_session.account_id = account_id.to_string();
749 Ok(new_session)
750 } else {
751 Ok(IgSession::from_config(
753 new_cst,
754 new_token,
755 account_id.to_string(),
756 self.cfg,
757 ))
758 }
759 }
760 other => {
761 error!("Account switch failed with status: {}", other);
762 let body = resp
763 .text()
764 .await
765 .unwrap_or_else(|_| "Could not read response body".to_string());
766 error!("Response body: {}", body);
767
768 if other == StatusCode::UNAUTHORIZED {
771 warn!(
772 "Cannot switch to account ID: {}. The account might not exist or you don't have permission.",
773 account_id
774 );
775 }
776
777 Err(AuthError::Unexpected(other))
778 }
779 }
780 }
781
782 async fn relogin(&self, session: &IgSession) -> Result<IgSession, AuthError> {
783 let margin = chrono::Duration::minutes(30);
785
786 let is_expired = {
787 let timer = session.token_timer.lock().unwrap();
788 timer.is_expired_w_margin(margin)
789 };
790
791 if is_expired {
792 info!("Tokens are expired or close to expiring, performing re-login");
793 self.login().await
794 } else {
795 debug!("Tokens are still valid, reusing existing session");
796 Ok(session.clone())
797 }
798 }
799
800 async fn relogin_and_switch_account(
801 &self,
802 session: &IgSession,
803 account_id: &str,
804 default_account: Option<bool>,
805 ) -> Result<IgSession, AuthError> {
806 let session = self.relogin(session).await?;
807 debug!(
808 "Relogin check completed for account: {}, trying to switch to {}",
809 session.account_id, account_id
810 );
811
812 match self
813 .switch_account(&session, account_id, default_account)
814 .await
815 {
816 Ok(new_session) => Ok(new_session),
817 Err(e) => {
818 warn!("Could not switch to account {}: {:?}.", account_id, e);
819 Err(e)
820 }
821 }
822 }
823
824 async fn login_and_switch_account(
825 &self,
826 account_id: &str,
827 default_account: Option<bool>,
828 ) -> Result<IgSession, AuthError> {
829 let session = self.login().await?;
830 self.relogin_and_switch_account(&session, account_id, default_account)
831 .await
832 }
833}