Skip to main content

reqwest_proxy_pool/
middleware.rs

1//! Middleware implementation for reqwest.
2
3use crate::classifier::ProxyResponseVerdict;
4use crate::config::ProxyPoolConfig;
5use crate::error::NoProxyAvailable;
6use crate::pool::ProxyPool;
7
8use anyhow::anyhow;
9use async_trait::async_trait;
10use log::{info, warn};
11use reqwest_middleware::{Error, Middleware, Next, Result};
12use std::sync::Arc;
13
14/// Middleware that uses a pool of proxies for HTTP requests.
15#[derive(Clone)]
16pub struct ProxyPoolMiddleware {
17    /// The proxy pool.
18    pool: Arc<ProxyPool>,
19}
20
21impl ProxyPoolMiddleware {
22    /// Create a new proxy pool middleware with the given configuration.
23    /// This will synchronously initialize the proxy pool and perform health checks.
24    pub async fn new(config: ProxyPoolConfig) -> Result<Self> {
25        match ProxyPool::new(config).await {
26            Ok(pool) => {
27                let (total, healthy) = pool.get_stats();
28                info!(
29                    "Proxy pool initialized with {}/{} healthy proxies",
30                    healthy, total
31                );
32
33                if healthy == 0 {
34                    warn!("No healthy proxies available in pool");
35                }
36
37                Ok(Self { pool })
38            }
39            Err(e) => Err(Error::Reqwest(e)),
40        }
41    }
42}
43
44#[async_trait]
45impl Middleware for ProxyPoolMiddleware {
46    async fn handle(
47        &self,
48        req: reqwest::Request,
49        _extensions: &mut http::Extensions,
50        _next: Next<'_>,
51    ) -> Result<reqwest::Response> {
52        let max_retries = self.pool.config.retry_count;
53        let mut retry_count = 0;
54
55        loop {
56            // Try to get a healthy proxy
57            match self.pool.get_proxy() {
58                Ok(proxy) => {
59                    let proxied_request = req.try_clone().ok_or_else(|| {
60                        Error::Middleware(anyhow!(
61                            "Request object is not cloneable. Are you passing a streaming body?"
62                                .to_string()
63                        ))
64                    })?;
65
66                    let proxy_url = proxy.url.clone();
67                    info!("Using proxy: {} (attempt {})", proxy_url, retry_count + 1);
68
69                    // Apply rate limiting
70                    proxy.limiter.until_ready().await;
71
72                    // Create a new client with the selected proxy
73                    let reqwest_proxy = match proxy.to_reqwest_proxy() {
74                        Ok(p) => p,
75                        Err(e) => {
76                            warn!("Failed to create proxy from {}: {}", proxy_url, e);
77                            self.pool.report_proxy_failure(&proxy_url);
78
79                            // Try another proxy if available
80                            retry_count += 1;
81                            if retry_count > max_retries {
82                                return Err(Error::Reqwest(e));
83                            }
84                            continue;
85                        }
86                    };
87
88                    // Build a new client with the proxy
89                    let client = match reqwest::Client::builder()
90                        .proxy(reqwest_proxy)
91                        .timeout(self.pool.config.health_check_timeout)
92                        .danger_accept_invalid_certs(self.pool.config.danger_accept_invalid_certs)
93                        .build()
94                    {
95                        Ok(c) => c,
96                        Err(e) => {
97                            warn!("Failed to build client with proxy {}: {}", proxy_url, e);
98                            self.pool.report_proxy_failure(&proxy_url);
99                            retry_count += 1;
100                            if retry_count > max_retries {
101                                return Err(Error::Reqwest(e));
102                            }
103                            continue;
104                        }
105                    };
106
107                    // Execute the request and pass extensions
108                    match client.execute(proxied_request).await {
109                        Ok(response) => {
110                            match self.pool.config.response_classifier.classify(&response) {
111                                ProxyResponseVerdict::Success => {
112                                    self.pool.report_proxy_success(&proxy_url);
113                                    return Ok(response);
114                                }
115                                ProxyResponseVerdict::ProxyBlocked => {
116                                    warn!(
117                                        "Proxy {} blocked by target site (attempt {})",
118                                        proxy_url,
119                                        retry_count + 1
120                                    );
121                                    self.pool.report_proxy_failure(&proxy_url);
122                                    retry_count += 1;
123                                    if retry_count > max_retries {
124                                        return Ok(response);
125                                    }
126                                    // retry with another proxy
127                                }
128                                ProxyResponseVerdict::Passthrough => {
129                                    return Ok(response);
130                                }
131                            }
132                        }
133                        Err(err) => {
134                            // Request failed
135                            warn!(
136                                "Request failed with proxy {} (attempt {}): {}",
137                                proxy_url,
138                                retry_count + 1,
139                                err
140                            );
141                            self.pool.report_proxy_failure(&proxy_url);
142
143                            retry_count += 1;
144                            if retry_count > max_retries {
145                                return Err(Error::Reqwest(err));
146                            }
147                            // Loop will continue to try another proxy
148                        }
149                    }
150                }
151                Err(_) => {
152                    // No healthy proxies available
153                    let (total, healthy) = self.pool.get_stats();
154                    warn!("No proxy available. Total: {}, Healthy: {}", total, healthy);
155                    return Err(Error::Middleware(anyhow!(NoProxyAvailable)));
156                }
157            }
158        }
159    }
160}