ig_client/transport/
http_client.rs

1use async_trait::async_trait;
2use once_cell::sync::Lazy;
3use reqwest::{Client, Method, RequestBuilder, Response, StatusCode};
4use serde::{Serialize, de::DeserializeOwned};
5use std::sync::Arc;
6use std::sync::atomic::{AtomicBool, Ordering};
7use std::time::Duration;
8use tokio::sync::Semaphore;
9use tracing::{debug, error, info, warn};
10
11use crate::constants::USER_AGENT;
12use crate::utils::rate_limiter::app_non_trading_limiter;
13use crate::{config::Config, error::AppError, session::interface::IgSession};
14
15// Global semaphore to limit concurrent API requests
16// This ensures that we don't exceed rate limits by making too many
17// concurrent requests
18static API_SEMAPHORE: Lazy<Arc<Semaphore>> = Lazy::new(|| {
19    Arc::new(Semaphore::new(3)) // Allow up to 3 concurrent requests
20});
21
22// Flag to indicate if we're in a rate-limited situation
23static RATE_LIMITED: Lazy<Arc<AtomicBool>> = Lazy::new(|| Arc::new(AtomicBool::new(false)));
24
25// Default retry configuration
26const DEFAULT_MAX_RETRIES: u32 = 10; // Increase max retries to ensure all requests are processed
27const DEFAULT_INITIAL_BACKOFF_MS: u64 = 1000; // 1 second
28const DEFAULT_MAX_BACKOFF_MS: u64 = 60000; // 60 seconds max backoff
29const DEFAULT_BACKOFF_FACTOR: f64 = 2.0; // Exponential backoff factor
30
31/// Interface for the IG HTTP client
32#[async_trait]
33pub trait IgHttpClient: Send + Sync {
34    /// Makes an HTTP request to the IG API
35    async fn request<T, R>(
36        &self,
37        method: Method,
38        path: &str,
39        session: &IgSession,
40        body: Option<&T>,
41        version: &str,
42    ) -> Result<R, AppError>
43    where
44        for<'de> R: DeserializeOwned + 'static,
45        T: Serialize + Send + Sync + 'static;
46
47    /// Makes an unauthenticated HTTP request (for login)
48    async fn request_no_auth<T, R>(
49        &self,
50        method: Method,
51        path: &str,
52        body: Option<&T>,
53        version: &str,
54    ) -> Result<R, AppError>
55    where
56        for<'de> R: DeserializeOwned + 'static,
57        T: Serialize + Send + Sync + 'static;
58}
59
60/// Implementation of the HTTP client for IG
61pub struct IgHttpClientImpl {
62    config: Arc<Config>,
63    client: Client,
64    max_retries: u32,
65    initial_backoff_ms: u64,
66    max_backoff_ms: u64,
67    backoff_factor: f64,
68}
69
70impl IgHttpClientImpl {
71    /// Creates a new instance of the HTTP client
72    pub fn new(config: Arc<Config>) -> Self {
73        let client = Client::builder()
74            .user_agent(USER_AGENT)
75            .timeout(Duration::from_secs(config.rest_api.timeout))
76            .build()
77            .expect("Failed to create HTTP client");
78
79        Self {
80            config,
81            client,
82            max_retries: DEFAULT_MAX_RETRIES,
83            initial_backoff_ms: DEFAULT_INITIAL_BACKOFF_MS,
84            max_backoff_ms: DEFAULT_MAX_BACKOFF_MS,
85            backoff_factor: DEFAULT_BACKOFF_FACTOR,
86        }
87    }
88
89    /// Configure retry behavior
90    pub fn with_retry_config(
91        mut self,
92        max_retries: u32,
93        initial_backoff_ms: u64,
94        max_backoff_ms: u64,
95        backoff_factor: f64,
96    ) -> Self {
97        self.max_retries = max_retries;
98        self.initial_backoff_ms = initial_backoff_ms;
99        self.max_backoff_ms = max_backoff_ms;
100        self.backoff_factor = backoff_factor;
101        self
102    }
103
104    /// Calculate backoff duration for retry attempts with jitter
105    fn calculate_backoff_duration(&self, retry_count: u32) -> Duration {
106        use rand::Rng;
107        let base_backoff_ms =
108            (self.initial_backoff_ms as f64 * self.backoff_factor.powi(retry_count as i32)) as u64;
109        let capped_backoff_ms = base_backoff_ms.min(self.max_backoff_ms);
110
111        // Add jitter (±20%) to avoid thundering herd problem
112        let jitter_factor = rand::rng().random_range(0.8..1.2);
113        let jittered_backoff_ms = (capped_backoff_ms as f64 * jitter_factor) as u64;
114
115        Duration::from_millis(jittered_backoff_ms)
116    }
117
118    /// Check if an error is retryable
119    fn is_retryable_error(&self, error: &AppError) -> bool {
120        match error {
121            AppError::RateLimitExceeded => true,
122            AppError::Network(e) => {
123                // Retry on connection errors, timeouts, and server errors
124                e.is_timeout() || e.is_connect() || e.status().is_some_and(|s| s.is_server_error())
125            }
126            _ => false,
127        }
128    }
129
130    /// Builds the complete URL for a request
131    fn build_url(&self, path: &str) -> String {
132        format!(
133            "{}/{}",
134            self.config.rest_api.base_url.trim_end_matches('/'),
135            path.trim_start_matches('/')
136        )
137    }
138
139    /// Adds common headers to all requests
140    fn add_common_headers(&self, builder: RequestBuilder, version: &str) -> RequestBuilder {
141        builder
142            .header("X-IG-API-KEY", &self.config.credentials.api_key)
143            .header("Content-Type", "application/json; charset=UTF-8")
144            .header("Accept", "application/json; charset=UTF-8")
145            .header("Version", version)
146    }
147
148    /// Adds authentication headers to a request
149    fn add_auth_headers(&self, builder: RequestBuilder, session: &IgSession) -> RequestBuilder {
150        builder
151            .header("CST", &session.cst)
152            .header("X-SECURITY-TOKEN", &session.token)
153    }
154
155    /// Processes the HTTP response and handles rate limiting centrally
156    async fn process_response<R>(&self, response: Response) -> Result<R, AppError>
157    where
158        for<'de> R: DeserializeOwned + 'static,
159    {
160        let status = response.status();
161        let url = response.url().to_string();
162
163        // Handle rate limiting centrally
164        if status == StatusCode::TOO_MANY_REQUESTS {
165            self.handle_rate_limit(&url, "TOO_MANY_REQUESTS status code")
166                .await;
167            return Err(AppError::RateLimitExceeded);
168        }
169
170        match status {
171            StatusCode::OK | StatusCode::CREATED | StatusCode::ACCEPTED => {
172                let body = response.text().await?;
173                match serde_json::from_str::<R>(&body) {
174                    Ok(data) => Ok(data),
175                    Err(e) => {
176                        error!("Error deserializing response from {}: {}", url, e);
177                        error!("Response body: {}", body);
178                        Err(AppError::Json(e))
179                    }
180                }
181            }
182            StatusCode::UNAUTHORIZED => {
183                error!("Unauthorized request to {}", url);
184                Err(AppError::Unauthorized)
185            }
186            StatusCode::NOT_FOUND => {
187                error!("Resource not found at {}", url);
188                Err(AppError::NotFound)
189            }
190            StatusCode::FORBIDDEN => {
191                let body = response.text().await?;
192                if body.contains("exceeded-api-key-allowance")
193                    || body.contains("exceeded-account-allowance")
194                {
195                    self.handle_rate_limit(
196                        &url,
197                        "FORBIDDEN with exceeded-api-key-allowance or exceeded-account-allowance",
198                    )
199                    .await;
200                    Err(AppError::RateLimitExceeded)
201                } else {
202                    error!("Forbidden access to {}: {}", url, body);
203                    Err(AppError::Unauthorized)
204                }
205            }
206            _ => {
207                let body = response.text().await?;
208                error!(
209                    "Unexpected status code {} for request to {}: {}",
210                    status, url, body
211                );
212                Err(AppError::Unexpected(status))
213            }
214        }
215    }
216
217    /// Helper method to handle rate limiting
218    async fn handle_rate_limit(&self, url: &str, reason: &str) {
219        // Set the rate limited flag
220        RATE_LIMITED.store(true, Ordering::SeqCst);
221        error!("Rate limit exceeded for request to {} ({})", url, reason);
222
223        // Notify all rate limiters about the exceeded limit
224        // This will cause them to enforce a mandatory cooldown period
225        let non_trading_limiter = app_non_trading_limiter();
226        non_trading_limiter.notify_rate_limit_exceeded().await;
227
228        // Schedule a task to reset the flag after a delay
229        // Increased from 30 to 60 seconds to give more time for rate limit to reset
230        let rate_limited = RATE_LIMITED.clone();
231        tokio::spawn(async move {
232            tokio::time::sleep(tokio::time::Duration::from_secs(60)).await;
233            rate_limited.store(false, Ordering::SeqCst);
234            info!("Rate limit flag reset after 60 second cooldown");
235        });
236    }
237}
238
239#[async_trait]
240impl IgHttpClient for IgHttpClientImpl {
241    async fn request<T, R>(
242        &self,
243        method: Method,
244        path: &str,
245        session: &IgSession,
246        body: Option<&T>,
247        version: &str,
248    ) -> Result<R, AppError>
249    where
250        for<'de> R: DeserializeOwned + 'static,
251        T: Serialize + Send + Sync + 'static,
252    {
253        let url = self.build_url(path);
254        let method_str = method.as_str().to_string(); // Store method as string for logging
255        debug!("Making {} request to {}", method_str, url);
256
257        let mut retry_count = 0;
258
259        // Retry loop
260        loop {
261            // Check if we should retry
262            if retry_count > 0 {
263                if retry_count > self.max_retries {
264                    warn!(
265                        "Max retries ({}) exceeded for {} request to {}",
266                        self.max_retries, method_str, url
267                    );
268                    break; // Exit the loop and try one last time without retrying
269                }
270
271                // Calculate backoff duration
272                let backoff = self.calculate_backoff_duration(retry_count - 1);
273                info!(
274                    "Retry attempt {} for {} request to {}. Waiting for {:?} before retrying",
275                    retry_count, method_str, url, backoff
276                );
277                tokio::time::sleep(backoff).await;
278            }
279
280            // Check if we're currently rate limited
281            if RATE_LIMITED.load(Ordering::SeqCst) {
282                warn!("System is currently rate limited. Adding extra delay before request.");
283                // Add a longer extra delay if we're in a rate-limited situation
284                // Use retry count to increase delay for subsequent retries
285                let rate_limit_delay = 2000 + (retry_count * 1000) as u64;
286                tokio::time::sleep(tokio::time::Duration::from_millis(rate_limit_delay)).await;
287            }
288
289            // Acquire a permit from the semaphore to limit concurrent requests
290            // This ensures we don't overwhelm the API with too many concurrent requests
291            let permit = API_SEMAPHORE.acquire().await.unwrap();
292            debug!(
293                "Acquired API semaphore permit for {} request to {}",
294                method_str, url
295            );
296
297            // Respect rate limits before making the request
298            // This will handle the actual rate limiting based on request history
299            match session.respect_rate_limit().await {
300                Ok(()) => {}
301                Err(e) => {
302                    drop(permit);
303                    if self.is_retryable_error(&e) {
304                        retry_count += 1;
305                        continue;
306                    }
307                    return Err(e);
308                }
309            }
310
311            let mut builder = self.client.request(method.clone(), &url);
312            builder = self.add_common_headers(builder, version);
313            builder = self.add_auth_headers(builder, session);
314
315            if let Some(data) = body {
316                builder = builder.json(data);
317            }
318
319            // Send the request
320            let response_result = builder.send().await;
321
322            // Check for network errors
323            let response = match response_result {
324                Ok(resp) => resp,
325                Err(e) => {
326                    error!("Network error for {} request to {}: {}", method_str, url, e);
327                    // Release the permit before continuing
328                    drop(permit);
329
330                    // Check if we should retry
331                    let app_error = AppError::Network(e);
332                    if self.is_retryable_error(&app_error) {
333                        retry_count += 1;
334                        continue;
335                    }
336                    return Err(app_error);
337                }
338            };
339
340            // Process the response - rate limiting is handled inside process_response
341            let result = self.process_response::<R>(response).await;
342
343            // If the request was successful, reset the rate limited flag
344            if result.is_ok() && RATE_LIMITED.load(Ordering::SeqCst) {
345                RATE_LIMITED.store(false, Ordering::SeqCst);
346                info!("Rate limit flag reset after successful request to {}", url);
347            }
348
349            // Release the permit (this happens automatically when permit goes out of scope,
350            // but we do it explicitly for clarity)
351            drop(permit);
352
353            // Handle the result
354            match &result {
355                Err(e) if self.is_retryable_error(e) => {
356                    retry_count += 1;
357                    continue;
358                }
359                _ => return result,
360            }
361        }
362
363        // Final attempt without retrying
364        info!(
365            "Making final attempt for {} request to {} after max retries",
366            method_str, url
367        );
368
369        // Acquire a permit from the semaphore
370        let permit = API_SEMAPHORE.acquire().await.unwrap();
371
372        // Respect rate limits
373        session.respect_rate_limit().await?;
374
375        let mut builder = self.client.request(method, &url);
376        builder = self.add_common_headers(builder, version);
377        builder = self.add_auth_headers(builder, session);
378
379        if let Some(data) = body {
380            builder = builder.json(data);
381        }
382
383        let response = builder.send().await?;
384        let result = self.process_response::<R>(response).await;
385
386        drop(permit);
387        result
388    }
389
390    async fn request_no_auth<T, R>(
391        &self,
392        method: Method,
393        path: &str,
394        body: Option<&T>,
395        version: &str,
396    ) -> Result<R, AppError>
397    where
398        for<'de> R: DeserializeOwned + 'static,
399        T: Serialize + Send + Sync + 'static,
400    {
401        let url = self.build_url(path);
402        let method_str = method.as_str().to_string(); // Store method as string for logging
403        info!("Making unauthenticated {} request to {}", method_str, url);
404
405        let mut retry_count = 0;
406
407        // Retry loop
408        loop {
409            // Check if we should retry
410            if retry_count > 0 {
411                if retry_count > self.max_retries {
412                    warn!(
413                        "Max retries ({}) exceeded for unauthenticated {} request to {}",
414                        self.max_retries, method_str, url
415                    );
416                    break; // Exit the loop and try one last time without retrying
417                }
418
419                // Calculate backoff duration
420                let backoff = self.calculate_backoff_duration(retry_count - 1);
421                info!(
422                    "Retry attempt {} for unauthenticated {} request to {}. Waiting for {:?} before retrying",
423                    retry_count, method_str, url, backoff
424                );
425                tokio::time::sleep(backoff).await;
426            }
427
428            // Check if we're currently rate limited
429            if RATE_LIMITED.load(Ordering::SeqCst) {
430                warn!(
431                    "System is currently rate limited. Adding extra delay before unauthenticated request."
432                );
433                // Add a longer extra delay if we're in a rate-limited situation
434                // Use retry count to increase delay for subsequent retries
435                let rate_limit_delay = 1000 + (retry_count * 500) as u64;
436                tokio::time::sleep(tokio::time::Duration::from_millis(rate_limit_delay)).await;
437            }
438
439            // Acquire a permit from the semaphore to limit concurrent requests
440            let permit = API_SEMAPHORE.acquire().await.unwrap();
441            debug!(
442                "Acquired API semaphore permit for unauthenticated {} request to {}",
443                method_str, url
444            );
445
446            // Use the global app rate limiter for unauthenticated requests
447            // This is thread-safe and can be called from multiple threads concurrently
448            let limiter = app_non_trading_limiter();
449            limiter.wait().await;
450
451            let mut builder = self.client.request(method.clone(), &url);
452            builder = self.add_common_headers(builder, version);
453
454            if let Some(data) = body {
455                builder = builder.json(data);
456            }
457
458            // Send the request
459            let response_result = builder.send().await;
460
461            // Check for network errors
462            let response = match response_result {
463                Ok(resp) => resp,
464                Err(e) => {
465                    error!(
466                        "Network error for unauthenticated {} request to {}: {}",
467                        method_str, url, e
468                    );
469                    // Release the permit before continuing
470                    drop(permit);
471
472                    // Check if we should retry
473                    let app_error = AppError::Network(e);
474                    if self.is_retryable_error(&app_error) {
475                        retry_count += 1;
476                        continue;
477                    }
478                    return Err(app_error);
479                }
480            };
481
482            // Process the response - rate limiting is handled inside process_response
483            let result = self.process_response::<R>(response).await;
484
485            // If the request was successful, reset the rate limited flag
486            if result.is_ok() && RATE_LIMITED.load(Ordering::SeqCst) {
487                RATE_LIMITED.store(false, Ordering::SeqCst);
488                info!(
489                    "Rate limit flag reset after successful unauthenticated request to {}",
490                    url
491                );
492            }
493
494            // Release the permit
495            drop(permit);
496
497            // Handle the result
498            match &result {
499                Err(e) if self.is_retryable_error(e) => {
500                    retry_count += 1;
501                    continue;
502                }
503                _ => return result,
504            }
505        }
506
507        // Final attempt without retrying
508        info!(
509            "Making final attempt for unauthenticated {} request to {} after max retries",
510            method_str, url
511        );
512
513        // Acquire a permit from the semaphore
514        let permit = API_SEMAPHORE.acquire().await.unwrap();
515
516        // Use the global app rate limiter
517        let limiter = app_non_trading_limiter();
518        limiter.wait().await;
519
520        let mut builder = self.client.request(method, &url);
521        builder = self.add_common_headers(builder, version);
522
523        if let Some(data) = body {
524            builder = builder.json(data);
525        }
526
527        let response = builder.send().await?;
528        let result = self.process_response::<R>(response).await;
529
530        drop(permit);
531        result
532    }
533}