reqwest_proxy_pool/
middleware.rs1use crate::config::ProxyPoolConfig;
4use crate::error::NoProxyAvailable;
5use crate::pool::ProxyPool;
6
7use anyhow::anyhow;
8use async_trait::async_trait;
9use log::{info, warn};
10use reqwest_middleware::{Error, Middleware, Next, Result};
11use std::sync::Arc;
12
13#[derive(Clone)]
15pub struct ProxyPoolMiddleware {
16 pool: Arc<ProxyPool>,
18}
19
20impl ProxyPoolMiddleware {
21 pub async fn new(config: ProxyPoolConfig) -> Result<Self> {
24 match ProxyPool::new(config).await {
25 Ok(pool) => {
26 let (total, healthy) = pool.get_stats();
27 info!("Proxy pool initialized with {}/{} healthy proxies", healthy, total);
28
29 if healthy == 0 {
30 warn!("No healthy proxies available in pool");
31 }
32
33 Ok(Self { pool })
34 }
35 Err(e) => {
36 Err(Error::Reqwest(e))
37 }
38 }
39 }
40}
41
42#[async_trait]
43impl Middleware for ProxyPoolMiddleware {
44 async fn handle(
45 &self,
46 req: reqwest::Request,
47 _extensions: &mut http::Extensions,
48 _next: Next<'_>,
49 ) -> Result<reqwest::Response> {
50 let max_retries = self.pool.config.retry_count;
51 let mut retry_count = 0;
52
53 loop {
54 match self.pool.get_proxy() {
56 Ok(proxy) => {
57 let proxied_request = req.try_clone().ok_or_else(|| {
58 Error::Middleware(anyhow!(
59 "Request object is not cloneable. Are you passing a streaming body?"
60 .to_string()
61 ))
62 })?;
63
64 let proxy_url = proxy.url.clone();
65 info!("Using proxy: {} (attempt {})", proxy_url, retry_count + 1);
66
67 proxy.limiter.until_ready().await;
69
70 let reqwest_proxy = match proxy.to_reqwest_proxy() {
72 Ok(p) => p,
73 Err(e) => {
74 warn!("Failed to create proxy from {}: {}", proxy_url, e);
75 self.pool.report_proxy_failure(&proxy_url);
76
77 retry_count += 1;
79 if retry_count > max_retries {
80 return Err(Error::Reqwest(e));
81 }
82 continue;
83 }
84 };
85
86 let client = match reqwest::Client::builder()
88 .proxy(reqwest_proxy)
89 .timeout(self.pool.config.health_check_timeout)
90 .build() {
91 Ok(c) => c,
92 Err(e) => {
93 warn!("Failed to build client with proxy {}: {}", proxy_url, e);
94 self.pool.report_proxy_failure(&proxy_url);
95 retry_count += 1;
96 if retry_count > max_retries {
97 return Err(Error::Reqwest(e));
98 }
99 continue;
100 }
101 };
102
103 match client.execute(proxied_request).await {
105 Ok(response) => {
106 self.pool.report_proxy_success(&proxy_url);
108 return Ok(response);
109 }
110 Err(err) => {
111 warn!("Request failed with proxy {} (attempt {}): {}",
113 proxy_url, retry_count + 1, err);
114 self.pool.report_proxy_failure(&proxy_url);
115
116 retry_count += 1;
117 if retry_count > max_retries {
118 return Err(Error::Reqwest(err));
119 }
120 }
122 }
123 }
124 Err(_) => {
125 let (total, healthy) = self.pool.get_stats();
127 warn!("No proxy available. Total: {}, Healthy: {}", total, healthy);
128 return Err(Error::Middleware(anyhow!(NoProxyAvailable)));
129 }
130 }
131 }
132 }
133}