1use crate::error::{Error, Result};
4use reqwest::Client;
5use std::sync::Arc;
6use std::time::{Duration, Instant};
7use tokio::time::sleep;
8use tracing::{debug, info, warn};
9
10#[derive(Debug, Clone)]
12pub struct RateLimitConfig {
13 pub max_requests_per_second: u32,
15 pub max_burst: u32,
17 pub backoff_multiplier: f64,
19 pub max_backoff: Duration,
21}
22
23impl Default for RateLimitConfig {
24 fn default() -> Self {
25 Self {
26 max_requests_per_second: 300, max_burst: 5,
28 backoff_multiplier: 2.0,
29 max_backoff: Duration::from_secs(60),
30 }
31 }
32}
33
34#[derive(Debug, Clone)]
36pub struct PoolConfig {
37 pub max_connections_per_host: usize,
39 pub idle_timeout: Duration,
41 pub max_lifetime: Duration,
43 pub http2_prior_knowledge: bool,
45}
46
47impl Default for PoolConfig {
48 fn default() -> Self {
49 Self {
50 max_connections_per_host: 10,
51 idle_timeout: Duration::from_secs(90),
52 max_lifetime: Duration::from_secs(300),
53 http2_prior_knowledge: true,
54 }
55 }
56}
57
58#[derive(Debug, Clone, Default)]
60pub struct Metrics {
61 pub total_requests: u64,
63 pub successful_requests: u64,
65 pub failed_requests: u64,
67 pub total_retries: u64,
69 pub average_response_time_ms: f64,
71 pub rate_limit_hits: u64,
73}
74
75#[derive(Debug, Clone)]
77pub struct RetryConfig {
78 pub max_retries: u32,
80 pub initial_delay: Duration,
82 pub max_delay: Duration,
84 pub multiplier: f64,
86 pub jitter: bool,
88}
89
90impl Default for RetryConfig {
91 fn default() -> Self {
92 Self {
93 max_retries: 3,
94 initial_delay: Duration::from_millis(500),
95 max_delay: Duration::from_secs(30),
96 multiplier: 2.0,
97 jitter: true,
98 }
99 }
100}
101
102pub struct ProductionExecutor {
104 client: Client,
105 rate_limit_config: RateLimitConfig,
106 retry_config: RetryConfig,
107 metrics: Arc<tokio::sync::Mutex<Metrics>>,
108 last_request_time: Arc<tokio::sync::Mutex<Option<Instant>>>,
109}
110
111impl ProductionExecutor {
112 pub fn new(
114 pool_config: PoolConfig,
115 rate_limit_config: RateLimitConfig,
116 retry_config: RetryConfig,
117 timeout: Duration,
118 ) -> Result<Self> {
119 let client = Client::builder()
120 .timeout(timeout)
121 .pool_max_idle_per_host(pool_config.max_connections_per_host)
122 .pool_idle_timeout(pool_config.idle_timeout)
123 .http2_prior_knowledge()
124 .user_agent("fmp-rs/1.0.0 (Production)")
125 .gzip(true)
126 .brotli(true)
127 .build()
128 .map_err(|e| Error::Custom(format!("Failed to create HTTP client: {}", e)))?;
129
130 Ok(Self {
131 client,
132 rate_limit_config,
133 retry_config,
134 metrics: Arc::new(tokio::sync::Mutex::new(Metrics::default())),
135 last_request_time: Arc::new(tokio::sync::Mutex::new(None)),
136 })
137 }
138
139 pub async fn execute_request<T>(&self, request_builder: reqwest::RequestBuilder) -> Result<T>
141 where
142 T: serde::de::DeserializeOwned,
143 {
144 let start_time = Instant::now();
145 let mut attempt = 0;
146 let mut delay = self.retry_config.initial_delay;
147
148 loop {
149 self.apply_rate_limiting().await;
151
152 let request = match request_builder.try_clone() {
154 Some(req) => req,
155 None => {
156 return Err(Error::Custom(
157 "Failed to clone request for retry".to_string(),
158 ));
159 }
160 };
161
162 self.update_request_metrics().await;
164
165 match self.execute_single_request::<T>(request).await {
167 Ok(result) => {
168 self.update_success_metrics(start_time.elapsed()).await;
170 if attempt > 0 {
171 info!("Request succeeded after {} retries", attempt);
172 }
173 return Ok(result);
174 }
175 Err(err) => {
176 attempt += 1;
177
178 if attempt >= self.retry_config.max_retries || !self.should_retry(&err) {
180 self.update_failure_metrics().await;
181 return Err(err);
182 }
183
184 warn!(
186 "Request failed (attempt {}), retrying in {:?}: {}",
187 attempt, delay, err
188 );
189 self.update_retry_metrics().await;
190
191 sleep(delay).await;
193
194 delay = std::cmp::min(
196 Duration::from_millis(
197 (delay.as_millis() as f64 * self.retry_config.multiplier) as u64,
198 ),
199 self.retry_config.max_delay,
200 );
201
202 if self.retry_config.jitter {
204 use rand::Rng;
205 let jitter_ms = rand::thread_rng().gen_range(0..=100);
206 delay += Duration::from_millis(jitter_ms);
207 }
208 }
209 }
210 }
211 }
212
213 async fn execute_single_request<T>(&self, request: reqwest::RequestBuilder) -> Result<T>
215 where
216 T: serde::de::DeserializeOwned,
217 {
218 let response = request.send().await?;
219
220 if response.status().as_u16() == 429 {
222 self.update_rate_limit_metrics().await;
223 return Err(Error::RateLimitExceeded);
224 }
225
226 if !response.status().is_success() {
228 let status = response.status().as_u16();
229 let error_text = response
230 .text()
231 .await
232 .unwrap_or_else(|_| "Unknown error".to_string());
233 return Err(Error::Api {
234 status,
235 message: error_text,
236 });
237 }
238
239 let text = response.text().await?;
241 debug!("Response body: {}", text);
242
243 match serde_json::from_str::<T>(&text) {
244 Ok(data) => Ok(data),
245 Err(e) => {
246 warn!(
247 "Failed to parse JSON response: {}. Response was: {}",
248 e, text
249 );
250 Err(Error::Json(e))
251 }
252 }
253 }
254
255 async fn apply_rate_limiting(&self) {
257 let mut last_request = self.last_request_time.lock().await;
258
259 if let Some(last_time) = *last_request {
260 let min_interval =
261 Duration::from_millis(1000 / self.rate_limit_config.max_requests_per_second as u64);
262 let elapsed = last_time.elapsed();
263
264 if elapsed < min_interval {
265 let sleep_time = min_interval - elapsed;
266 debug!("Rate limiting: sleeping for {:?}", sleep_time);
267 sleep(sleep_time).await;
268 }
269 }
270
271 *last_request = Some(Instant::now());
272 }
273
274 fn should_retry(&self, error: &Error) -> bool {
276 match error {
277 Error::Http(reqwest_error) => {
278 reqwest_error.is_timeout()
280 || reqwest_error.is_connect()
281 || reqwest_error.is_request()
282 }
283 Error::RateLimitExceeded => true,
284 Error::Api { status, .. } => {
285 *status >= 500
287 }
288 _ => false,
289 }
290 }
291
292 async fn update_request_metrics(&self) {
294 let mut metrics = self.metrics.lock().await;
295 metrics.total_requests += 1;
296 }
297
298 async fn update_success_metrics(&self, response_time: Duration) {
300 let mut metrics = self.metrics.lock().await;
301 metrics.successful_requests += 1;
302
303 let total_responses = metrics.successful_requests + metrics.failed_requests;
305 let old_avg = metrics.average_response_time_ms;
306 let new_response_time = response_time.as_millis() as f64;
307
308 metrics.average_response_time_ms =
309 (old_avg * (total_responses - 1) as f64 + new_response_time) / total_responses as f64;
310 }
311
312 async fn update_failure_metrics(&self) {
314 let mut metrics = self.metrics.lock().await;
315 metrics.failed_requests += 1;
316 }
317
318 async fn update_retry_metrics(&self) {
320 let mut metrics = self.metrics.lock().await;
321 metrics.total_retries += 1;
322 }
323
324 async fn update_rate_limit_metrics(&self) {
326 let mut metrics = self.metrics.lock().await;
327 metrics.rate_limit_hits += 1;
328 }
329
330 pub async fn get_metrics(&self) -> Metrics {
332 self.metrics.lock().await.clone()
333 }
334
335 pub async fn reset_metrics(&self) {
337 let mut metrics = self.metrics.lock().await;
338 *metrics = Metrics::default();
339 }
340
341 pub fn client(&self) -> &Client {
343 &self.client
344 }
345}