gatel_core/proxy/
forward_proxy.rs1use http::{Response, StatusCode, Version};
29use tokio::io::copy_bidirectional;
30use tokio::net::TcpStream;
31use tracing::{debug, error, warn};
32
33use crate::config::BasicAuthUser;
34use crate::{Body, ProxyError, empty_body, full_body, goals};
35
36pub struct ForwardProxy {
44 auth_users: Vec<ProxyAuthUser>,
45}
46
47struct ProxyAuthUser {
48 username: String,
49 password_hash: String,
50 is_bcrypt: bool,
51}
52
53impl ForwardProxy {
54 pub fn new(auth_users: &[BasicAuthUser]) -> Self {
55 let auth_users = auth_users
56 .iter()
57 .map(|u| {
58 let is_bcrypt = u.password_hash.starts_with("$2b$")
59 || u.password_hash.starts_with("$2a$")
60 || u.password_hash.starts_with("$2y$");
61 ProxyAuthUser {
62 username: u.username.clone(),
63 password_hash: u.password_hash.clone(),
64 is_bcrypt,
65 }
66 })
67 .collect();
68 Self { auth_users }
69 }
70}
71
72#[salvo::async_trait]
73impl salvo::Handler for ForwardProxy {
74 async fn handle(
75 &self,
76 req: &mut salvo::Request,
77 _depot: &mut salvo::Depot,
78 res: &mut salvo::Response,
79 ctrl: &mut salvo::FlowCtrl,
80 ) {
81 let client_addr = crate::hoops::client_addr(req);
82 let request = match goals::strip_request(req) {
83 Ok(r) => r,
84 Err(e) => {
85 goals::merge_response(res, e.into_response());
86 ctrl.skip_rest();
87 return;
88 }
89 };
90 let response = self
91 .run(request, client_addr)
92 .await
93 .unwrap_or_else(|e| e.into_response());
94 goals::merge_response(res, response);
95 ctrl.skip_rest();
96 }
97}
98
99impl ForwardProxy {
100 async fn run(
101 &self,
102 mut request: http::Request<Body>,
103 _client_addr: std::net::SocketAddr,
104 ) -> Result<Response<Body>, ProxyError> {
105 if request.method() != http::Method::CONNECT {
106 return Err(ProxyError::BadRequest(
107 "forward proxy only supports CONNECT".into(),
108 ));
109 }
110
111 if !self.auth_users.is_empty() {
113 match extract_proxy_credentials(&request) {
114 Some((username, password)) => {
115 let ok = self
116 .auth_users
117 .iter()
118 .any(|u| verify_proxy_user(u, &username, &password));
119 if !ok {
120 debug!(
121 username = username.as_str(),
122 "proxy authentication failed, returning 407"
123 );
124 return Ok(proxy_auth_required_response());
125 }
126 }
127 None => {
128 debug!("no Proxy-Authorization header, returning 407");
129 return Ok(proxy_auth_required_response());
130 }
131 }
132 }
133
134 if request.version() == Version::HTTP_2 {
139 warn!("HTTP/2 CONNECT tunnel is not supported; client should use HTTP/1.1");
140 return Ok(Response::builder()
141 .status(StatusCode::NOT_IMPLEMENTED)
142 .body(crate::full_body(
143 "HTTP/2 CONNECT tunneling is not supported; use HTTP/1.1",
144 ))?);
145 }
146
147 let authority = request
149 .uri()
150 .authority()
151 .map(|a| a.to_string())
152 .or_else(|| {
153 request.uri().host().map(|h| {
154 let port = request.uri().port_u16().unwrap_or(443);
155 format!("{h}:{port}")
156 })
157 })
158 .ok_or_else(|| ProxyError::BadRequest("CONNECT request missing authority".into()))?;
159
160 debug!(target = %authority, "CONNECT tunnel request");
161
162 let upstream = TcpStream::connect(&authority)
164 .await
165 .map_err(|e| ProxyError::Internal(format!("failed to connect to {authority}: {e}")))?;
166 upstream.set_nodelay(true).ok();
167
168 let client_upgrade = hyper::upgrade::on(&mut request);
171
172 let response = Response::builder()
174 .status(StatusCode::OK)
175 .body(empty_body())?;
176
177 tokio::spawn(async move {
180 match client_upgrade.await {
181 Ok(client_io) => {
182 let mut client_io = hyper_util::rt::TokioIo::new(client_io);
183 let mut upstream = upstream;
184
185 match copy_bidirectional(&mut client_io, &mut upstream).await {
186 Ok((up, down)) => {
187 debug!(
188 bytes_up = up,
189 bytes_down = down,
190 target = %authority,
191 "CONNECT tunnel closed"
192 );
193 }
194 Err(e) => {
195 debug!(error = %e, target = %authority, "CONNECT tunnel error");
196 }
197 }
198 }
199 Err(e) => {
200 error!(error = %e, "CONNECT upgrade failed");
201 }
202 }
203 });
204
205 Ok(response)
206 }
207}
208
209fn extract_proxy_credentials(req: &http::Request<Body>) -> Option<(String, String)> {
216 let header_value = req.headers().get("proxy-authorization")?.to_str().ok()?;
217 let encoded = header_value.strip_prefix("Basic ")?;
218 let decoded_bytes = base64_decode(encoded)?;
219 let decoded = String::from_utf8(decoded_bytes).ok()?;
220 let (username, password) = decoded.split_once(':')?;
221 Some((username.to_string(), password.to_string()))
222}
223
224fn base64_decode(input: &str) -> Option<Vec<u8>> {
227 const TABLE: &[u8; 64] = b"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/";
228
229 let input = input.trim();
230 if input.is_empty() {
231 return Some(Vec::new());
232 }
233
234 let mut output = Vec::with_capacity(input.len() * 3 / 4);
235 let mut buf: u32 = 0;
236 let mut bits: u32 = 0;
237
238 for &b in input.as_bytes() {
239 if b == b'=' {
240 break;
241 }
242 let val = match TABLE.iter().position(|&c| c == b) {
243 Some(v) => v as u32,
244 None => {
245 if b == b'\n' || b == b'\r' || b == b' ' {
246 continue;
247 }
248 return None;
249 }
250 };
251 buf = (buf << 6) | val;
252 bits += 6;
253 if bits >= 8 {
254 bits -= 8;
255 output.push((buf >> bits) as u8);
256 buf &= (1 << bits) - 1;
257 }
258 }
259
260 Some(output)
261}
262
263fn verify_proxy_user(user: &ProxyAuthUser, username: &str, password: &str) -> bool {
265 if user.username != username {
266 return false;
267 }
268 if user.is_bcrypt {
269 #[cfg(feature = "bcrypt")]
270 {
271 bcrypt::verify(password, &user.password_hash).unwrap_or(false)
272 }
273 #[cfg(not(feature = "bcrypt"))]
274 {
275 warn!("bcrypt password hash found but bcrypt feature is not enabled, rejecting");
276 false
277 }
278 } else {
279 constant_time_eq(password.as_bytes(), user.password_hash.as_bytes())
280 }
281}
282
283fn constant_time_eq(a: &[u8], b: &[u8]) -> bool {
285 if a.len() != b.len() {
286 return false;
287 }
288 let mut diff: u8 = 0;
289 for (x, y) in a.iter().zip(b.iter()) {
290 diff |= x ^ y;
291 }
292 diff == 0
293}
294
295fn proxy_auth_required_response() -> Response<Body> {
298 Response::builder()
299 .status(StatusCode::PROXY_AUTHENTICATION_REQUIRED)
300 .header("Proxy-Authenticate", "Basic realm=\"gatel\"")
301 .body(full_body("Proxy Authentication Required"))
302 .unwrap()
303}