reqwest_proxy_pool/
middleware.rs1use crate::classifier::ProxyResponseVerdict;
4use crate::config::ProxyPoolConfig;
5use crate::error::NoProxyAvailable;
6use crate::pool::ProxyPool;
7
8use anyhow::anyhow;
9use async_trait::async_trait;
10use log::{info, warn};
11use reqwest_middleware::{Error, Middleware, Next, Result};
12use std::sync::Arc;
13
14#[derive(Clone)]
16pub struct ProxyPoolMiddleware {
17 pool: Arc<ProxyPool>,
19}
20
21impl ProxyPoolMiddleware {
22 pub async fn new(config: ProxyPoolConfig) -> Result<Self> {
25 match ProxyPool::new(config).await {
26 Ok(pool) => {
27 let (total, healthy) = pool.get_stats();
28 info!(
29 "Proxy pool initialized with {}/{} healthy proxies",
30 healthy, total
31 );
32
33 if healthy == 0 {
34 warn!("No healthy proxies available in pool");
35 }
36
37 Ok(Self { pool })
38 }
39 Err(e) => Err(Error::Reqwest(e)),
40 }
41 }
42}
43
44#[async_trait]
45impl Middleware for ProxyPoolMiddleware {
46 async fn handle(
47 &self,
48 req: reqwest::Request,
49 _extensions: &mut http::Extensions,
50 _next: Next<'_>,
51 ) -> Result<reqwest::Response> {
52 let max_retries = self.pool.config.retry_count;
53 let mut retry_count = 0;
54
55 loop {
56 match self.pool.get_proxy() {
58 Ok(proxy) => {
59 let proxied_request = req.try_clone().ok_or_else(|| {
60 Error::Middleware(anyhow!(
61 "Request object is not cloneable. Are you passing a streaming body?"
62 .to_string()
63 ))
64 })?;
65
66 let proxy_url = proxy.url.clone();
67 info!("Using proxy: {} (attempt {})", proxy_url, retry_count + 1);
68
69 proxy.limiter.until_ready().await;
71
72 let reqwest_proxy = match proxy.to_reqwest_proxy() {
74 Ok(p) => p,
75 Err(e) => {
76 warn!("Failed to create proxy from {}: {}", proxy_url, e);
77 self.pool.report_proxy_failure(&proxy_url);
78
79 retry_count += 1;
81 if retry_count > max_retries {
82 return Err(Error::Reqwest(e));
83 }
84 continue;
85 }
86 };
87
88 let client = match reqwest::Client::builder()
90 .proxy(reqwest_proxy)
91 .timeout(self.pool.config.health_check_timeout)
92 .danger_accept_invalid_certs(self.pool.config.danger_accept_invalid_certs)
93 .build()
94 {
95 Ok(c) => c,
96 Err(e) => {
97 warn!("Failed to build client with proxy {}: {}", proxy_url, e);
98 self.pool.report_proxy_failure(&proxy_url);
99 retry_count += 1;
100 if retry_count > max_retries {
101 return Err(Error::Reqwest(e));
102 }
103 continue;
104 }
105 };
106
107 match client.execute(proxied_request).await {
109 Ok(response) => {
110 match self.pool.config.response_classifier.classify(&response) {
111 ProxyResponseVerdict::Success => {
112 self.pool.report_proxy_success(&proxy_url);
113 return Ok(response);
114 }
115 ProxyResponseVerdict::ProxyBlocked => {
116 warn!(
117 "Proxy {} blocked by target site (attempt {})",
118 proxy_url,
119 retry_count + 1
120 );
121 self.pool.report_proxy_failure(&proxy_url);
122 retry_count += 1;
123 if retry_count > max_retries {
124 return Ok(response);
125 }
126 }
128 ProxyResponseVerdict::Passthrough => {
129 return Ok(response);
130 }
131 }
132 }
133 Err(err) => {
134 warn!(
136 "Request failed with proxy {} (attempt {}): {}",
137 proxy_url,
138 retry_count + 1,
139 err
140 );
141 self.pool.report_proxy_failure(&proxy_url);
142
143 retry_count += 1;
144 if retry_count > max_retries {
145 return Err(Error::Reqwest(err));
146 }
147 }
149 }
150 }
151 Err(_) => {
152 let (total, healthy) = self.pool.get_stats();
154 warn!("No proxy available. Total: {}, Healthy: {}", total, healthy);
155 return Err(Error::Middleware(anyhow!(NoProxyAvailable)));
156 }
157 }
158 }
159 }
160}