fmp_rs/
production.rs

1//! Production-grade enhancements for the FMP client.
2
3use crate::error::{Error, Result};
4use reqwest::Client;
5use std::sync::Arc;
6use std::time::{Duration, Instant};
7use tokio::time::sleep;
8use tracing::{debug, info, warn};
9
10/// Rate limiting configuration
11#[derive(Debug, Clone)]
12pub struct RateLimitConfig {
13    /// Maximum requests per second
14    pub max_requests_per_second: u32,
15    /// Maximum burst requests
16    pub max_burst: u32,
17    /// Backoff multiplier for retries
18    pub backoff_multiplier: f64,
19    /// Maximum backoff duration
20    pub max_backoff: Duration,
21}
22
23impl Default for RateLimitConfig {
24    fn default() -> Self {
25        Self {
26            max_requests_per_second: 300, // FMP free tier limit
27            max_burst: 5,
28            backoff_multiplier: 2.0,
29            max_backoff: Duration::from_secs(60),
30        }
31    }
32}
33
34/// Connection pool configuration
35#[derive(Debug, Clone)]
36pub struct PoolConfig {
37    /// Maximum number of connections per host
38    pub max_connections_per_host: usize,
39    /// Connection idle timeout
40    pub idle_timeout: Duration,
41    /// Maximum connection lifetime
42    pub max_lifetime: Duration,
43    /// Enable HTTP/2
44    pub http2_prior_knowledge: bool,
45}
46
47impl Default for PoolConfig {
48    fn default() -> Self {
49        Self {
50            max_connections_per_host: 10,
51            idle_timeout: Duration::from_secs(90),
52            max_lifetime: Duration::from_secs(300),
53            http2_prior_knowledge: true,
54        }
55    }
56}
57
58/// Performance monitoring metrics
59#[derive(Debug, Clone, Default)]
60pub struct Metrics {
61    /// Total requests made
62    pub total_requests: u64,
63    /// Total successful requests
64    pub successful_requests: u64,
65    /// Total failed requests
66    pub failed_requests: u64,
67    /// Total retries
68    pub total_retries: u64,
69    /// Average response time in milliseconds
70    pub average_response_time_ms: f64,
71    /// Rate limit hits
72    pub rate_limit_hits: u64,
73}
74
75/// Enhanced retry configuration
76#[derive(Debug, Clone)]
77pub struct RetryConfig {
78    /// Maximum number of retry attempts
79    pub max_retries: u32,
80    /// Initial retry delay
81    pub initial_delay: Duration,
82    /// Maximum retry delay
83    pub max_delay: Duration,
84    /// Exponential backoff multiplier
85    pub multiplier: f64,
86    /// Add jitter to prevent thundering herd
87    pub jitter: bool,
88}
89
90impl Default for RetryConfig {
91    fn default() -> Self {
92        Self {
93            max_retries: 3,
94            initial_delay: Duration::from_millis(500),
95            max_delay: Duration::from_secs(30),
96            multiplier: 2.0,
97            jitter: true,
98        }
99    }
100}
101
102/// Production-grade request executor with retry logic and rate limiting
103pub struct ProductionExecutor {
104    client: Client,
105    rate_limit_config: RateLimitConfig,
106    retry_config: RetryConfig,
107    metrics: Arc<tokio::sync::Mutex<Metrics>>,
108    last_request_time: Arc<tokio::sync::Mutex<Option<Instant>>>,
109}
110
111impl ProductionExecutor {
112    /// Create a new production executor
113    pub fn new(
114        pool_config: PoolConfig,
115        rate_limit_config: RateLimitConfig,
116        retry_config: RetryConfig,
117        timeout: Duration,
118    ) -> Result<Self> {
119        let client = Client::builder()
120            .timeout(timeout)
121            .pool_max_idle_per_host(pool_config.max_connections_per_host)
122            .pool_idle_timeout(pool_config.idle_timeout)
123            .http2_prior_knowledge()
124            .user_agent("fmp-rs/1.0.0 (Production)")
125            .gzip(true)
126            .brotli(true)
127            .build()
128            .map_err(|e| Error::Custom(format!("Failed to create HTTP client: {}", e)))?;
129
130        Ok(Self {
131            client,
132            rate_limit_config,
133            retry_config,
134            metrics: Arc::new(tokio::sync::Mutex::new(Metrics::default())),
135            last_request_time: Arc::new(tokio::sync::Mutex::new(None)),
136        })
137    }
138
139    /// Execute a request with retry logic and rate limiting
140    pub async fn execute_request<T>(&self, request_builder: reqwest::RequestBuilder) -> Result<T>
141    where
142        T: serde::de::DeserializeOwned,
143    {
144        let start_time = Instant::now();
145        let mut attempt = 0;
146        let mut delay = self.retry_config.initial_delay;
147
148        loop {
149            // Apply rate limiting
150            self.apply_rate_limiting().await;
151
152            // Clone the request for retry attempts
153            let request = match request_builder.try_clone() {
154                Some(req) => req,
155                None => {
156                    return Err(Error::Custom(
157                        "Failed to clone request for retry".to_string(),
158                    ));
159                }
160            };
161
162            // Update metrics
163            self.update_request_metrics().await;
164
165            // Execute the request
166            match self.execute_single_request::<T>(request).await {
167                Ok(result) => {
168                    // Update success metrics
169                    self.update_success_metrics(start_time.elapsed()).await;
170                    if attempt > 0 {
171                        info!("Request succeeded after {} retries", attempt);
172                    }
173                    return Ok(result);
174                }
175                Err(err) => {
176                    attempt += 1;
177
178                    // Check if we should retry
179                    if attempt >= self.retry_config.max_retries || !self.should_retry(&err) {
180                        self.update_failure_metrics().await;
181                        return Err(err);
182                    }
183
184                    // Log retry attempt
185                    warn!(
186                        "Request failed (attempt {}), retrying in {:?}: {}",
187                        attempt, delay, err
188                    );
189                    self.update_retry_metrics().await;
190
191                    // Wait before retrying
192                    sleep(delay).await;
193
194                    // Calculate next delay with exponential backoff
195                    delay = std::cmp::min(
196                        Duration::from_millis(
197                            (delay.as_millis() as f64 * self.retry_config.multiplier) as u64,
198                        ),
199                        self.retry_config.max_delay,
200                    );
201
202                    // Add jitter if enabled
203                    if self.retry_config.jitter {
204                        use rand::Rng;
205                        let jitter_ms = rand::thread_rng().gen_range(0..=100);
206                        delay += Duration::from_millis(jitter_ms);
207                    }
208                }
209            }
210        }
211    }
212
213    /// Execute a single request attempt
214    async fn execute_single_request<T>(&self, request: reqwest::RequestBuilder) -> Result<T>
215    where
216        T: serde::de::DeserializeOwned,
217    {
218        let response = request.send().await?;
219
220        // Handle rate limiting
221        if response.status().as_u16() == 429 {
222            self.update_rate_limit_metrics().await;
223            return Err(Error::RateLimitExceeded);
224        }
225
226        // Handle other HTTP errors
227        if !response.status().is_success() {
228            let status = response.status().as_u16();
229            let error_text = response
230                .text()
231                .await
232                .unwrap_or_else(|_| "Unknown error".to_string());
233            return Err(Error::Api {
234                status,
235                message: error_text,
236            });
237        }
238
239        // Parse JSON response
240        let text = response.text().await?;
241        debug!("Response body: {}", text);
242
243        match serde_json::from_str::<T>(&text) {
244            Ok(data) => Ok(data),
245            Err(e) => {
246                warn!(
247                    "Failed to parse JSON response: {}. Response was: {}",
248                    e, text
249                );
250                Err(Error::Json(e))
251            }
252        }
253    }
254
255    /// Apply rate limiting logic
256    async fn apply_rate_limiting(&self) {
257        let mut last_request = self.last_request_time.lock().await;
258
259        if let Some(last_time) = *last_request {
260            let min_interval =
261                Duration::from_millis(1000 / self.rate_limit_config.max_requests_per_second as u64);
262            let elapsed = last_time.elapsed();
263
264            if elapsed < min_interval {
265                let sleep_time = min_interval - elapsed;
266                debug!("Rate limiting: sleeping for {:?}", sleep_time);
267                sleep(sleep_time).await;
268            }
269        }
270
271        *last_request = Some(Instant::now());
272    }
273
274    /// Determine if an error is retryable
275    fn should_retry(&self, error: &Error) -> bool {
276        match error {
277            Error::Http(reqwest_error) => {
278                // Retry on connection errors, timeouts, etc.
279                reqwest_error.is_timeout()
280                    || reqwest_error.is_connect()
281                    || reqwest_error.is_request()
282            }
283            Error::RateLimitExceeded => true,
284            Error::Api { status, .. } => {
285                // Retry on server errors, not client errors
286                *status >= 500
287            }
288            _ => false,
289        }
290    }
291
292    /// Update request metrics
293    async fn update_request_metrics(&self) {
294        let mut metrics = self.metrics.lock().await;
295        metrics.total_requests += 1;
296    }
297
298    /// Update success metrics
299    async fn update_success_metrics(&self, response_time: Duration) {
300        let mut metrics = self.metrics.lock().await;
301        metrics.successful_requests += 1;
302
303        // Update average response time
304        let total_responses = metrics.successful_requests + metrics.failed_requests;
305        let old_avg = metrics.average_response_time_ms;
306        let new_response_time = response_time.as_millis() as f64;
307
308        metrics.average_response_time_ms =
309            (old_avg * (total_responses - 1) as f64 + new_response_time) / total_responses as f64;
310    }
311
312    /// Update failure metrics
313    async fn update_failure_metrics(&self) {
314        let mut metrics = self.metrics.lock().await;
315        metrics.failed_requests += 1;
316    }
317
318    /// Update retry metrics
319    async fn update_retry_metrics(&self) {
320        let mut metrics = self.metrics.lock().await;
321        metrics.total_retries += 1;
322    }
323
324    /// Update rate limit metrics
325    async fn update_rate_limit_metrics(&self) {
326        let mut metrics = self.metrics.lock().await;
327        metrics.rate_limit_hits += 1;
328    }
329
330    /// Get current metrics
331    pub async fn get_metrics(&self) -> Metrics {
332        self.metrics.lock().await.clone()
333    }
334
335    /// Reset metrics
336    pub async fn reset_metrics(&self) {
337        let mut metrics = self.metrics.lock().await;
338        *metrics = Metrics::default();
339    }
340
341    /// Get the underlying HTTP client
342    pub fn client(&self) -> &Client {
343        &self.client
344    }
345}