gosh_dl/http/
connection.rs

1//! Connection Pool Management
2//!
3//! This module provides HTTP connection pooling with health checks,
4//! retry logic, and speed limiting capabilities.
5
6use crate::config::HttpConfig;
7use crate::error::{EngineError, NetworkErrorKind, Result};
8use governor::{DefaultDirectRateLimiter, Quota, RateLimiter};
9use reqwest::Client;
10use std::num::NonZeroU32;
11use std::sync::atomic::{AtomicU64, Ordering};
12use std::time::{Duration, Instant};
13use tokio::sync::RwLock;
14
15/// Connection pool with rate limiting and health monitoring
16pub struct ConnectionPool {
17    /// HTTP client (reqwest handles its own connection pool)
18    client: Client,
19    /// Global rate limiter for download speed
20    download_limiter: Option<DefaultDirectRateLimiter>,
21    /// Global rate limiter for upload speed
22    upload_limiter: Option<DefaultDirectRateLimiter>,
23    /// Total bytes downloaded
24    total_downloaded: AtomicU64,
25    /// Total bytes uploaded
26    total_uploaded: AtomicU64,
27    /// Active connection count
28    active_connections: AtomicU64,
29    /// Connection statistics
30    stats: RwLock<ConnectionStats>,
31}
32
33/// Connection statistics
34#[derive(Debug, Clone, Default)]
35pub struct ConnectionStats {
36    /// Total connections created
37    pub connections_created: u64,
38    /// Total successful requests
39    pub successful_requests: u64,
40    /// Total failed requests
41    pub failed_requests: u64,
42    /// Total retried requests
43    pub retried_requests: u64,
44    /// Average response time in milliseconds
45    pub avg_response_time_ms: f64,
46    /// Last error message
47    pub last_error: Option<String>,
48}
49
50impl ConnectionPool {
51    /// Create a new connection pool
52    pub fn new(config: &HttpConfig) -> Result<Self> {
53        let mut builder = Client::builder()
54            .connect_timeout(Duration::from_secs(config.connect_timeout))
55            .timeout(Duration::from_secs(config.read_timeout))
56            .redirect(reqwest::redirect::Policy::limited(config.max_redirects))
57            .danger_accept_invalid_certs(config.accept_invalid_certs)
58            .pool_max_idle_per_host(32)
59            .pool_idle_timeout(Duration::from_secs(90))
60            .gzip(true)
61            .brotli(true);
62
63        // Add proxy if configured
64        if let Some(ref proxy_url) = config.proxy_url {
65            let proxy = reqwest::Proxy::all(proxy_url)
66                .map_err(|e| EngineError::Internal(format!("Invalid proxy URL: {}", e)))?;
67            builder = builder.proxy(proxy);
68        }
69
70        let client = builder
71            .build()
72            .map_err(|e| EngineError::Internal(format!("Failed to create HTTP client: {}", e)))?;
73
74        Ok(Self {
75            client,
76            download_limiter: None,
77            upload_limiter: None,
78            total_downloaded: AtomicU64::new(0),
79            total_uploaded: AtomicU64::new(0),
80            active_connections: AtomicU64::new(0),
81            stats: RwLock::new(ConnectionStats::default()),
82        })
83    }
84
85    /// Create a connection pool with rate limiting
86    pub fn with_limits(
87        config: &HttpConfig,
88        download_limit: Option<u64>,
89        upload_limit: Option<u64>,
90    ) -> Result<Self> {
91        let mut pool = Self::new(config)?;
92
93        pool.download_limiter = download_limit.and_then(|limit| {
94            NonZeroU32::new(limit as u32).map(|n| RateLimiter::direct(Quota::per_second(n)))
95        });
96
97        pool.upload_limiter = upload_limit.and_then(|limit| {
98            NonZeroU32::new(limit as u32).map(|n| RateLimiter::direct(Quota::per_second(n)))
99        });
100
101        Ok(pool)
102    }
103
104    /// Get the underlying HTTP client
105    pub fn client(&self) -> &Client {
106        &self.client
107    }
108
109    /// Update download speed limit
110    pub fn set_download_limit(&mut self, limit: Option<u64>) {
111        self.download_limiter = limit.and_then(|l| {
112            NonZeroU32::new(l as u32).map(|n| RateLimiter::direct(Quota::per_second(n)))
113        });
114    }
115
116    /// Update upload speed limit
117    pub fn set_upload_limit(&mut self, limit: Option<u64>) {
118        self.upload_limiter = limit.and_then(|l| {
119            NonZeroU32::new(l as u32).map(|n| RateLimiter::direct(Quota::per_second(n)))
120        });
121    }
122
123    /// Wait for rate limiter permission to download bytes
124    pub async fn acquire_download(&self, bytes: u64) {
125        if let Some(ref limiter) = self.download_limiter {
126            // Acquire permission in chunks to avoid blocking too long
127            let chunk_size = 16384; // 16KB chunks
128            let chunks = (bytes / chunk_size).max(1) as u32;
129            for _ in 0..chunks {
130                if let Some(n) = NonZeroU32::new(chunk_size as u32) {
131                    let _ = limiter.until_n_ready(n).await;
132                }
133            }
134        }
135    }
136
137    /// Wait for rate limiter permission to upload bytes
138    pub async fn acquire_upload(&self, bytes: u64) {
139        if let Some(ref limiter) = self.upload_limiter {
140            let chunk_size = 16384;
141            let chunks = (bytes / chunk_size).max(1) as u32;
142            for _ in 0..chunks {
143                if let Some(n) = NonZeroU32::new(chunk_size as u32) {
144                    let _ = limiter.until_n_ready(n).await;
145                }
146            }
147        }
148    }
149
150    /// Record downloaded bytes
151    pub fn record_download(&self, bytes: u64) {
152        self.total_downloaded.fetch_add(bytes, Ordering::Relaxed);
153    }
154
155    /// Record uploaded bytes
156    pub fn record_upload(&self, bytes: u64) {
157        self.total_uploaded.fetch_add(bytes, Ordering::Relaxed);
158    }
159
160    /// Get total downloaded bytes
161    pub fn total_downloaded(&self) -> u64 {
162        self.total_downloaded.load(Ordering::Relaxed)
163    }
164
165    /// Get total uploaded bytes
166    pub fn total_uploaded(&self) -> u64 {
167        self.total_uploaded.load(Ordering::Relaxed)
168    }
169
170    /// Increment active connection count
171    pub fn connection_started(&self) {
172        self.active_connections.fetch_add(1, Ordering::Relaxed);
173    }
174
175    /// Decrement active connection count
176    pub fn connection_finished(&self) {
177        self.active_connections.fetch_sub(1, Ordering::Relaxed);
178    }
179
180    /// Get active connection count
181    pub fn active_connections(&self) -> u64 {
182        self.active_connections.load(Ordering::Relaxed)
183    }
184
185    /// Record a successful request
186    pub async fn record_success(&self, response_time_ms: f64) {
187        let mut stats = self.stats.write().await;
188        stats.successful_requests += 1;
189
190        // Update average response time (exponential moving average)
191        let alpha = 0.2;
192        stats.avg_response_time_ms =
193            alpha * response_time_ms + (1.0 - alpha) * stats.avg_response_time_ms;
194    }
195
196    /// Record a failed request
197    pub async fn record_failure(&self, error: &str) {
198        let mut stats = self.stats.write().await;
199        stats.failed_requests += 1;
200        stats.last_error = Some(error.to_string());
201    }
202
203    /// Record a retried request
204    pub async fn record_retry(&self) {
205        let mut stats = self.stats.write().await;
206        stats.retried_requests += 1;
207    }
208
209    /// Get connection statistics
210    pub async fn stats(&self) -> ConnectionStats {
211        self.stats.read().await.clone()
212    }
213}
214
215/// Retry policy with exponential backoff and jitter
216#[derive(Debug, Clone)]
217pub struct RetryPolicy {
218    /// Maximum number of retry attempts
219    pub max_attempts: u32,
220    /// Initial delay in milliseconds
221    pub initial_delay_ms: u64,
222    /// Maximum delay in milliseconds
223    pub max_delay_ms: u64,
224    /// Jitter factor (0.0 to 1.0)
225    pub jitter_factor: f64,
226}
227
228impl Default for RetryPolicy {
229    fn default() -> Self {
230        Self {
231            max_attempts: 3,
232            initial_delay_ms: 1000,
233            max_delay_ms: 30000,
234            jitter_factor: 0.25,
235        }
236    }
237}
238
239impl RetryPolicy {
240    /// Create a new retry policy
241    pub fn new(max_attempts: u32, initial_delay_ms: u64, max_delay_ms: u64) -> Self {
242        Self {
243            max_attempts,
244            initial_delay_ms,
245            max_delay_ms,
246            jitter_factor: 0.25,
247        }
248    }
249
250    /// Calculate delay for a given attempt (0-indexed)
251    pub fn delay_for_attempt(&self, attempt: u32) -> Duration {
252        // Exponential backoff
253        let base = self.initial_delay_ms * 2u64.pow(attempt.min(10));
254        let capped = base.min(self.max_delay_ms);
255
256        // Add jitter: ±jitter_factor randomness
257        let jitter = (rand::random::<f64>() - 0.5) * 2.0 * self.jitter_factor;
258        let with_jitter = (capped as f64 * (1.0 + jitter)) as u64;
259
260        Duration::from_millis(with_jitter)
261    }
262
263    /// Check if we should retry based on error type
264    pub fn should_retry(&self, attempt: u32, error: &EngineError) -> bool {
265        if attempt >= self.max_attempts {
266            return false;
267        }
268
269        error.is_retryable()
270    }
271}
272
273/// Execute a request with retry logic
274pub async fn with_retry<F, T, Fut>(
275    pool: &ConnectionPool,
276    policy: &RetryPolicy,
277    operation: F,
278) -> Result<T>
279where
280    F: Fn() -> Fut,
281    Fut: std::future::Future<Output = Result<T>>,
282{
283    let mut last_error = None;
284
285    for attempt in 0..policy.max_attempts {
286        let start = Instant::now();
287
288        match operation().await {
289            Ok(result) => {
290                let elapsed = start.elapsed().as_millis() as f64;
291                pool.record_success(elapsed).await;
292                return Ok(result);
293            }
294            Err(e) => {
295                let _elapsed = start.elapsed().as_millis() as f64;
296                pool.record_failure(&e.to_string()).await;
297
298                if policy.should_retry(attempt, &e) {
299                    pool.record_retry().await;
300                    let delay = policy.delay_for_attempt(attempt);
301                    tracing::debug!(
302                        "Request failed (attempt {}), retrying in {:?}: {}",
303                        attempt + 1,
304                        delay,
305                        e
306                    );
307                    tokio::time::sleep(delay).await;
308                    last_error = Some(e);
309                } else {
310                    return Err(e);
311                }
312            }
313        }
314    }
315
316    Err(last_error
317        .unwrap_or_else(|| EngineError::network(NetworkErrorKind::Other, "Max retries exceeded")))
318}
319
320/// Speed calculator for tracking download/upload rates
321#[derive(Debug)]
322pub struct SpeedCalculator {
323    /// Window size for averaging
324    window_size: usize,
325    /// Recent measurements (bytes, timestamp)
326    measurements: Vec<(u64, Instant)>,
327    /// Total bytes tracked
328    total_bytes: u64,
329}
330
331impl SpeedCalculator {
332    /// Create a new speed calculator
333    pub fn new(window_size: usize) -> Self {
334        Self {
335            window_size,
336            measurements: Vec::with_capacity(window_size),
337            total_bytes: 0,
338        }
339    }
340
341    /// Add a measurement
342    pub fn add_bytes(&mut self, bytes: u64) {
343        let now = Instant::now();
344        self.total_bytes += bytes;
345
346        if self.measurements.len() >= self.window_size {
347            self.measurements.remove(0);
348        }
349        self.measurements.push((bytes, now));
350    }
351
352    /// Calculate current speed in bytes/second
353    pub fn speed(&self) -> u64 {
354        if self.measurements.len() < 2 {
355            return 0;
356        }
357
358        let first = &self.measurements[0];
359        let last = &self.measurements[self.measurements.len() - 1];
360
361        let elapsed = last.1.duration_since(first.1).as_secs_f64();
362        if elapsed <= 0.0 {
363            return 0;
364        }
365
366        let bytes: u64 = self.measurements.iter().map(|(b, _)| *b).sum();
367        (bytes as f64 / elapsed) as u64
368    }
369
370    /// Get total bytes tracked
371    pub fn total(&self) -> u64 {
372        self.total_bytes
373    }
374
375    /// Reset the calculator
376    pub fn reset(&mut self) {
377        self.measurements.clear();
378        self.total_bytes = 0;
379    }
380}
381
382#[cfg(test)]
383mod tests {
384    use super::*;
385
386    #[test]
387    fn test_retry_delay() {
388        let policy = RetryPolicy::new(3, 1000, 30000);
389
390        // First attempt: ~1000ms
391        let delay0 = policy.delay_for_attempt(0);
392        assert!(delay0.as_millis() >= 750 && delay0.as_millis() <= 1250);
393
394        // Second attempt: ~2000ms
395        let delay1 = policy.delay_for_attempt(1);
396        assert!(delay1.as_millis() >= 1500 && delay1.as_millis() <= 2500);
397
398        // Third attempt: ~4000ms
399        let delay2 = policy.delay_for_attempt(2);
400        assert!(delay2.as_millis() >= 3000 && delay2.as_millis() <= 5000);
401    }
402
403    #[test]
404    fn test_speed_calculator() {
405        let mut calc = SpeedCalculator::new(10);
406
407        // Add measurements
408        calc.add_bytes(1000);
409        std::thread::sleep(Duration::from_millis(100));
410        calc.add_bytes(1000);
411        std::thread::sleep(Duration::from_millis(100));
412        calc.add_bytes(1000);
413
414        // Speed should be roughly 10000 bytes/sec (3000 bytes in 0.2 sec)
415        // But due to timing variations, we just check it's non-zero
416        let speed = calc.speed();
417        assert!(speed > 0);
418
419        assert_eq!(calc.total(), 3000);
420    }
421
422    #[test]
423    fn test_retry_policy_defaults() {
424        let policy = RetryPolicy::default();
425        assert_eq!(policy.max_attempts, 3);
426        assert_eq!(policy.initial_delay_ms, 1000);
427        assert_eq!(policy.max_delay_ms, 30000);
428    }
429}