1use crate::config::HttpConfig;
7use crate::error::{EngineError, NetworkErrorKind, Result};
8use governor::{DefaultDirectRateLimiter, Quota, RateLimiter};
9use reqwest::Client;
10use std::num::NonZeroU32;
11use std::sync::atomic::{AtomicU64, Ordering};
12use std::time::{Duration, Instant};
13use tokio::sync::RwLock;
14
15pub struct ConnectionPool {
17 client: Client,
19 download_limiter: Option<DefaultDirectRateLimiter>,
21 upload_limiter: Option<DefaultDirectRateLimiter>,
23 total_downloaded: AtomicU64,
25 total_uploaded: AtomicU64,
27 active_connections: AtomicU64,
29 stats: RwLock<ConnectionStats>,
31}
32
33#[derive(Debug, Clone, Default)]
35pub struct ConnectionStats {
36 pub connections_created: u64,
38 pub successful_requests: u64,
40 pub failed_requests: u64,
42 pub retried_requests: u64,
44 pub avg_response_time_ms: f64,
46 pub last_error: Option<String>,
48}
49
50impl ConnectionPool {
51 pub fn new(config: &HttpConfig) -> Result<Self> {
53 let mut builder = Client::builder()
54 .connect_timeout(Duration::from_secs(config.connect_timeout))
55 .timeout(Duration::from_secs(config.read_timeout))
56 .redirect(reqwest::redirect::Policy::limited(config.max_redirects))
57 .danger_accept_invalid_certs(config.accept_invalid_certs)
58 .pool_max_idle_per_host(32)
59 .pool_idle_timeout(Duration::from_secs(90))
60 .gzip(true)
61 .brotli(true);
62
63 if let Some(ref proxy_url) = config.proxy_url {
65 let proxy = reqwest::Proxy::all(proxy_url)
66 .map_err(|e| EngineError::Internal(format!("Invalid proxy URL: {}", e)))?;
67 builder = builder.proxy(proxy);
68 }
69
70 let client = builder
71 .build()
72 .map_err(|e| EngineError::Internal(format!("Failed to create HTTP client: {}", e)))?;
73
74 Ok(Self {
75 client,
76 download_limiter: None,
77 upload_limiter: None,
78 total_downloaded: AtomicU64::new(0),
79 total_uploaded: AtomicU64::new(0),
80 active_connections: AtomicU64::new(0),
81 stats: RwLock::new(ConnectionStats::default()),
82 })
83 }
84
85 pub fn with_limits(
87 config: &HttpConfig,
88 download_limit: Option<u64>,
89 upload_limit: Option<u64>,
90 ) -> Result<Self> {
91 let mut pool = Self::new(config)?;
92
93 pool.download_limiter = download_limit.and_then(|limit| {
94 NonZeroU32::new(limit as u32).map(|n| RateLimiter::direct(Quota::per_second(n)))
95 });
96
97 pool.upload_limiter = upload_limit.and_then(|limit| {
98 NonZeroU32::new(limit as u32).map(|n| RateLimiter::direct(Quota::per_second(n)))
99 });
100
101 Ok(pool)
102 }
103
104 pub fn client(&self) -> &Client {
106 &self.client
107 }
108
109 pub fn set_download_limit(&mut self, limit: Option<u64>) {
111 self.download_limiter = limit.and_then(|l| {
112 NonZeroU32::new(l as u32).map(|n| RateLimiter::direct(Quota::per_second(n)))
113 });
114 }
115
116 pub fn set_upload_limit(&mut self, limit: Option<u64>) {
118 self.upload_limiter = limit.and_then(|l| {
119 NonZeroU32::new(l as u32).map(|n| RateLimiter::direct(Quota::per_second(n)))
120 });
121 }
122
123 pub async fn acquire_download(&self, bytes: u64) {
125 if let Some(ref limiter) = self.download_limiter {
126 let chunk_size = 16384; let chunks = (bytes / chunk_size).max(1) as u32;
129 for _ in 0..chunks {
130 if let Some(n) = NonZeroU32::new(chunk_size as u32) {
131 let _ = limiter.until_n_ready(n).await;
132 }
133 }
134 }
135 }
136
137 pub async fn acquire_upload(&self, bytes: u64) {
139 if let Some(ref limiter) = self.upload_limiter {
140 let chunk_size = 16384;
141 let chunks = (bytes / chunk_size).max(1) as u32;
142 for _ in 0..chunks {
143 if let Some(n) = NonZeroU32::new(chunk_size as u32) {
144 let _ = limiter.until_n_ready(n).await;
145 }
146 }
147 }
148 }
149
150 pub fn record_download(&self, bytes: u64) {
152 self.total_downloaded.fetch_add(bytes, Ordering::Relaxed);
153 }
154
155 pub fn record_upload(&self, bytes: u64) {
157 self.total_uploaded.fetch_add(bytes, Ordering::Relaxed);
158 }
159
160 pub fn total_downloaded(&self) -> u64 {
162 self.total_downloaded.load(Ordering::Relaxed)
163 }
164
165 pub fn total_uploaded(&self) -> u64 {
167 self.total_uploaded.load(Ordering::Relaxed)
168 }
169
170 pub fn connection_started(&self) {
172 self.active_connections.fetch_add(1, Ordering::Relaxed);
173 }
174
175 pub fn connection_finished(&self) {
177 self.active_connections.fetch_sub(1, Ordering::Relaxed);
178 }
179
180 pub fn active_connections(&self) -> u64 {
182 self.active_connections.load(Ordering::Relaxed)
183 }
184
185 pub async fn record_success(&self, response_time_ms: f64) {
187 let mut stats = self.stats.write().await;
188 stats.successful_requests += 1;
189
190 let alpha = 0.2;
192 stats.avg_response_time_ms =
193 alpha * response_time_ms + (1.0 - alpha) * stats.avg_response_time_ms;
194 }
195
196 pub async fn record_failure(&self, error: &str) {
198 let mut stats = self.stats.write().await;
199 stats.failed_requests += 1;
200 stats.last_error = Some(error.to_string());
201 }
202
203 pub async fn record_retry(&self) {
205 let mut stats = self.stats.write().await;
206 stats.retried_requests += 1;
207 }
208
209 pub async fn stats(&self) -> ConnectionStats {
211 self.stats.read().await.clone()
212 }
213}
214
215#[derive(Debug, Clone)]
217pub struct RetryPolicy {
218 pub max_attempts: u32,
220 pub initial_delay_ms: u64,
222 pub max_delay_ms: u64,
224 pub jitter_factor: f64,
226}
227
228impl Default for RetryPolicy {
229 fn default() -> Self {
230 Self {
231 max_attempts: 3,
232 initial_delay_ms: 1000,
233 max_delay_ms: 30000,
234 jitter_factor: 0.25,
235 }
236 }
237}
238
239impl RetryPolicy {
240 pub fn new(max_attempts: u32, initial_delay_ms: u64, max_delay_ms: u64) -> Self {
242 Self {
243 max_attempts,
244 initial_delay_ms,
245 max_delay_ms,
246 jitter_factor: 0.25,
247 }
248 }
249
250 pub fn delay_for_attempt(&self, attempt: u32) -> Duration {
252 let base = self.initial_delay_ms * 2u64.pow(attempt.min(10));
254 let capped = base.min(self.max_delay_ms);
255
256 let jitter = (rand::random::<f64>() - 0.5) * 2.0 * self.jitter_factor;
258 let with_jitter = (capped as f64 * (1.0 + jitter)) as u64;
259
260 Duration::from_millis(with_jitter)
261 }
262
263 pub fn should_retry(&self, attempt: u32, error: &EngineError) -> bool {
265 if attempt >= self.max_attempts {
266 return false;
267 }
268
269 error.is_retryable()
270 }
271}
272
273pub async fn with_retry<F, T, Fut>(
275 pool: &ConnectionPool,
276 policy: &RetryPolicy,
277 operation: F,
278) -> Result<T>
279where
280 F: Fn() -> Fut,
281 Fut: std::future::Future<Output = Result<T>>,
282{
283 let mut last_error = None;
284
285 for attempt in 0..policy.max_attempts {
286 let start = Instant::now();
287
288 match operation().await {
289 Ok(result) => {
290 let elapsed = start.elapsed().as_millis() as f64;
291 pool.record_success(elapsed).await;
292 return Ok(result);
293 }
294 Err(e) => {
295 let _elapsed = start.elapsed().as_millis() as f64;
296 pool.record_failure(&e.to_string()).await;
297
298 if policy.should_retry(attempt, &e) {
299 pool.record_retry().await;
300 let delay = policy.delay_for_attempt(attempt);
301 tracing::debug!(
302 "Request failed (attempt {}), retrying in {:?}: {}",
303 attempt + 1,
304 delay,
305 e
306 );
307 tokio::time::sleep(delay).await;
308 last_error = Some(e);
309 } else {
310 return Err(e);
311 }
312 }
313 }
314 }
315
316 Err(last_error
317 .unwrap_or_else(|| EngineError::network(NetworkErrorKind::Other, "Max retries exceeded")))
318}
319
320#[derive(Debug)]
322pub struct SpeedCalculator {
323 window_size: usize,
325 measurements: Vec<(u64, Instant)>,
327 total_bytes: u64,
329}
330
331impl SpeedCalculator {
332 pub fn new(window_size: usize) -> Self {
334 Self {
335 window_size,
336 measurements: Vec::with_capacity(window_size),
337 total_bytes: 0,
338 }
339 }
340
341 pub fn add_bytes(&mut self, bytes: u64) {
343 let now = Instant::now();
344 self.total_bytes += bytes;
345
346 if self.measurements.len() >= self.window_size {
347 self.measurements.remove(0);
348 }
349 self.measurements.push((bytes, now));
350 }
351
352 pub fn speed(&self) -> u64 {
354 if self.measurements.len() < 2 {
355 return 0;
356 }
357
358 let first = &self.measurements[0];
359 let last = &self.measurements[self.measurements.len() - 1];
360
361 let elapsed = last.1.duration_since(first.1).as_secs_f64();
362 if elapsed <= 0.0 {
363 return 0;
364 }
365
366 let bytes: u64 = self.measurements.iter().map(|(b, _)| *b).sum();
367 (bytes as f64 / elapsed) as u64
368 }
369
370 pub fn total(&self) -> u64 {
372 self.total_bytes
373 }
374
375 pub fn reset(&mut self) {
377 self.measurements.clear();
378 self.total_bytes = 0;
379 }
380}
381
382#[cfg(test)]
383mod tests {
384 use super::*;
385
386 #[test]
387 fn test_retry_delay() {
388 let policy = RetryPolicy::new(3, 1000, 30000);
389
390 let delay0 = policy.delay_for_attempt(0);
392 assert!(delay0.as_millis() >= 750 && delay0.as_millis() <= 1250);
393
394 let delay1 = policy.delay_for_attempt(1);
396 assert!(delay1.as_millis() >= 1500 && delay1.as_millis() <= 2500);
397
398 let delay2 = policy.delay_for_attempt(2);
400 assert!(delay2.as_millis() >= 3000 && delay2.as_millis() <= 5000);
401 }
402
403 #[test]
404 fn test_speed_calculator() {
405 let mut calc = SpeedCalculator::new(10);
406
407 calc.add_bytes(1000);
409 std::thread::sleep(Duration::from_millis(100));
410 calc.add_bytes(1000);
411 std::thread::sleep(Duration::from_millis(100));
412 calc.add_bytes(1000);
413
414 let speed = calc.speed();
417 assert!(speed > 0);
418
419 assert_eq!(calc.total(), 3000);
420 }
421
422 #[test]
423 fn test_retry_policy_defaults() {
424 let policy = RetryPolicy::default();
425 assert_eq!(policy.max_attempts, 3);
426 assert_eq!(policy.initial_delay_ms, 1000);
427 assert_eq!(policy.max_delay_ms, 30000);
428 }
429}