reqwest_proxy_pool/
middleware.rs

1//! Middleware implementation for reqwest.
2
3use crate::config::ProxyPoolConfig;
4use crate::error::NoProxyAvailable;
5use crate::pool::ProxyPool;
6
7use anyhow::anyhow;
8use async_trait::async_trait;
9use log::{info, warn};
10use reqwest_middleware::{Error, Middleware, Next, Result};
11use std::sync::Arc;
12
13/// Middleware that uses a pool of proxies for HTTP requests.
14#[derive(Clone)]
15pub struct ProxyPoolMiddleware {
16    /// The proxy pool.
17    pool: Arc<ProxyPool>,
18}
19
20impl ProxyPoolMiddleware {
21    /// Create a new proxy pool middleware with the given configuration.
22    /// This will synchronously initialize the proxy pool and perform health checks.
23    pub async fn new(config: ProxyPoolConfig) -> Result<Self> {
24        match ProxyPool::new(config).await {
25            Ok(pool) => {
26                let (total, healthy) = pool.get_stats();
27                info!("Proxy pool initialized with {}/{} healthy proxies", healthy, total);
28                
29                if healthy == 0 {
30                    warn!("No healthy proxies available in pool");
31                }
32                
33                Ok(Self { pool })
34            }
35            Err(e) => {
36                Err(Error::Reqwest(e))
37            }
38        }
39    }
40}
41
42#[async_trait]
43impl Middleware for ProxyPoolMiddleware {
44    async fn handle(
45        &self,
46        req: reqwest::Request,
47        _extensions: &mut http::Extensions,
48        _next: Next<'_>,
49    ) -> Result<reqwest::Response> {
50        let max_retries = self.pool.config.retry_count;
51        let mut retry_count = 0;
52        
53        loop {
54            // Try to get a healthy proxy
55            match self.pool.get_proxy() {
56                Ok(proxy) => {
57                    let proxied_request = req.try_clone().ok_or_else(|| {
58                        Error::Middleware(anyhow!(
59                            "Request object is not cloneable. Are you passing a streaming body?"
60                                .to_string()
61                        ))
62                    })?;
63
64                    let proxy_url = proxy.url.clone();
65                    info!("Using proxy: {} (attempt {})", proxy_url, retry_count + 1);
66                    
67                    // Apply rate limiting
68                    proxy.limiter.until_ready().await;
69                    
70                    // Create a new client with the selected proxy
71                    let reqwest_proxy = match proxy.to_reqwest_proxy() {
72                        Ok(p) => p,
73                        Err(e) => {
74                            warn!("Failed to create proxy from {}: {}", proxy_url, e);
75                            self.pool.report_proxy_failure(&proxy_url);
76                            
77                            // Try another proxy if available
78                            retry_count += 1;
79                            if retry_count > max_retries {
80                                return Err(Error::Reqwest(e));
81                            }
82                            continue;
83                        }
84                    };
85                    
86                    // Build a new client with the proxy
87                    let client = match reqwest::Client::builder()
88                        .proxy(reqwest_proxy)
89                        .timeout(self.pool.config.health_check_timeout)
90                        .build() {
91                        Ok(c) => c,
92                        Err(e) => {
93                            warn!("Failed to build client with proxy {}: {}", proxy_url, e);
94                            self.pool.report_proxy_failure(&proxy_url);
95                            retry_count += 1;
96                            if retry_count > max_retries {
97                                return Err(Error::Reqwest(e));
98                            }
99                            continue;
100                        }
101                    };
102                    
103                    // Execute the request and pass extensions
104                    match client.execute(proxied_request).await {
105                        Ok(response) => {
106                            // Request succeeded
107                            self.pool.report_proxy_success(&proxy_url);
108                            return Ok(response);
109                        }
110                        Err(err) => {
111                            // Request failed
112                            warn!("Request failed with proxy {} (attempt {}): {}", 
113                                proxy_url, retry_count + 1, err);
114                            self.pool.report_proxy_failure(&proxy_url);
115                            
116                            retry_count += 1;
117                            if retry_count > max_retries {
118                                return Err(Error::Reqwest(err));
119                            }
120                            // Loop will continue to try another proxy
121                        }
122                    }
123                }
124                Err(_) => {
125                    // No healthy proxies available
126                    let (total, healthy) = self.pool.get_stats();
127                    warn!("No proxy available. Total: {}, Healthy: {}", total, healthy);
128                    return Err(Error::Middleware(anyhow!(NoProxyAvailable)));
129                }
130            }
131        }
132    }
133}