1use std::collections::HashMap;
7
8use bytes::{BufMut, BytesMut};
9use http::Response;
10use tokio::io::{AsyncReadExt, AsyncWriteExt};
11use tokio::net::TcpStream;
12use tracing::{debug, warn};
13
14use crate::{Body, ProxyError, full_body, goals};
15
16const FCGI_VERSION_1: u8 = 1;
21const FCGI_BEGIN_REQUEST: u8 = 1;
22#[allow(dead_code)]
23const FCGI_ABORT_REQUEST: u8 = 2;
24const FCGI_END_REQUEST: u8 = 3;
25const FCGI_PARAMS: u8 = 4;
26const FCGI_STDIN: u8 = 5;
27const FCGI_STDOUT: u8 = 6;
28const FCGI_STDERR: u8 = 7;
29
30const FCGI_RESPONDER: u16 = 1;
31const FCGI_HEADER_LEN: usize = 8;
32const FCGI_REQUEST_ID: u16 = 1;
33
34const FCGI_MAX_CONTENT_LEN: usize = 65535;
36
37pub struct FastCgiTransport {
44 addr: String,
46 script_root: String,
48 index: Vec<String>,
50 split_path: Option<String>,
53 env: HashMap<String, String>,
55}
56
57impl FastCgiTransport {
58 pub fn new(
60 addr: String,
61 script_root: String,
62 index: Vec<String>,
63 split_path: Option<String>,
64 env: HashMap<String, String>,
65 ) -> Self {
66 Self {
67 addr,
68 script_root,
69 index,
70 split_path,
71 env,
72 }
73 }
74
75 pub async fn send_request(
77 &self,
78 req: &http::request::Parts,
79 body: &[u8],
80 ) -> Result<Response<Body>, ProxyError> {
81 let mut stream = TcpStream::connect(&self.addr)
83 .await
84 .map_err(|e| ProxyError::Internal(format!("FastCGI connect to {}: {e}", self.addr)))?;
85
86 debug!(addr = %self.addr, "connected to FastCGI server");
87
88 let begin_body = build_begin_request_body(FCGI_RESPONDER, 0);
90 let begin_record = build_record(FCGI_BEGIN_REQUEST, FCGI_REQUEST_ID, &begin_body);
91 stream.write_all(&begin_record).await?;
92
93 let params = self.build_params(req, body.len());
95 let encoded_params = encode_params(¶ms);
96 send_stream_records(&mut stream, FCGI_PARAMS, FCGI_REQUEST_ID, &encoded_params).await?;
97 let empty_params = build_record(FCGI_PARAMS, FCGI_REQUEST_ID, &[]);
99 stream.write_all(&empty_params).await?;
100
101 send_stream_records(&mut stream, FCGI_STDIN, FCGI_REQUEST_ID, body).await?;
103 let empty_stdin = build_record(FCGI_STDIN, FCGI_REQUEST_ID, &[]);
105 stream.write_all(&empty_stdin).await?;
106
107 stream.flush().await?;
108
109 let mut stdout_buf = Vec::new();
111 let mut stderr_buf = Vec::new();
112
113 loop {
114 let header = read_record_header(&mut stream).await?;
115 let content = read_exact(&mut stream, header.content_length as usize).await?;
116 if header.padding_length > 0 {
118 let _padding = read_exact(&mut stream, header.padding_length as usize).await?;
119 }
120
121 match header.record_type {
122 FCGI_STDOUT => {
123 stdout_buf.extend_from_slice(&content);
124 }
125 FCGI_STDERR => {
126 stderr_buf.extend_from_slice(&content);
127 if !stderr_buf.is_empty() {
128 let msg = String::from_utf8_lossy(&stderr_buf);
129 warn!(fastcgi_stderr = %msg, "FastCGI stderr output");
130 }
131 }
132 FCGI_END_REQUEST => {
133 debug!("FastCGI END_REQUEST received");
134 break;
135 }
136 other => {
137 debug!(record_type = other, "ignoring unknown FastCGI record type");
138 }
139 }
140 }
141
142 parse_cgi_response(&stdout_buf)
145 }
146
147 fn build_params(
149 &self,
150 req: &http::request::Parts,
151 content_length: usize,
152 ) -> Vec<(String, String)> {
153 let uri_path = req.uri.path();
154 let query = req.uri.query().unwrap_or("");
155
156 let (script_name, path_info) = if let Some(ref split) = self.split_path {
158 split_script_path(uri_path, split)
159 } else {
160 (uri_path.to_string(), String::new())
161 };
162
163 let script_name = if script_name.ends_with('/') || script_name == "/" {
165 let idx = self
166 .index
167 .first()
168 .cloned()
169 .unwrap_or_else(|| "index.php".into());
170 format!("{}/{idx}", script_name.trim_end_matches('/'))
171 } else {
172 script_name
173 };
174
175 let script_filename = format!("{}{}", self.script_root.trim_end_matches('/'), script_name);
176
177 let path_translated = if path_info.is_empty() {
178 String::new()
179 } else {
180 format!("{}{}", self.script_root.trim_end_matches('/'), path_info)
181 };
182
183 let server_name = req
184 .headers
185 .get(http::header::HOST)
186 .and_then(|v| v.to_str().ok())
187 .map(|h| h.split(':').next().unwrap_or(h).to_string())
188 .unwrap_or_else(|| "localhost".into());
189
190 let server_port = req
191 .headers
192 .get(http::header::HOST)
193 .and_then(|v| v.to_str().ok())
194 .and_then(|h| h.split(':').nth(1))
195 .unwrap_or("80")
196 .to_string();
197
198 let content_type = req
199 .headers
200 .get(http::header::CONTENT_TYPE)
201 .and_then(|v| v.to_str().ok())
202 .unwrap_or("")
203 .to_string();
204
205 let server_protocol = format!("{:?}", req.version);
206
207 let request_uri = req
208 .uri
209 .path_and_query()
210 .map(|pq| pq.to_string())
211 .unwrap_or_else(|| uri_path.to_string());
212
213 let mut params = vec![
214 ("SCRIPT_FILENAME".into(), script_filename),
215 ("SCRIPT_NAME".into(), script_name.clone()),
216 ("DOCUMENT_ROOT".into(), self.script_root.clone()),
217 ("QUERY_STRING".into(), query.to_string()),
218 ("REQUEST_METHOD".into(), req.method.to_string()),
219 ("CONTENT_TYPE".into(), content_type),
220 ("CONTENT_LENGTH".into(), content_length.to_string()),
221 ("SERVER_NAME".into(), server_name),
222 ("SERVER_PORT".into(), server_port),
223 ("SERVER_PROTOCOL".into(), server_protocol),
224 ("REQUEST_URI".into(), request_uri),
225 ("PATH_INFO".into(), path_info.clone()),
226 ("PATH_TRANSLATED".into(), path_translated),
227 ("GATEWAY_INTERFACE".into(), "CGI/1.1".into()),
228 ("SERVER_SOFTWARE".into(), "gatel".into()),
229 ];
230
231 for (name, value) in req.headers.iter() {
233 if let Ok(val) = value.to_str() {
234 let env_name = format!("HTTP_{}", name.as_str().to_uppercase().replace('-', "_"));
235 params.push((env_name, val.to_string()));
236 }
237 }
238
239 for (k, v) in &self.env {
241 params.push((k.clone(), v.clone()));
242 }
243
244 params
245 }
246}
247
248#[salvo::async_trait]
249impl salvo::Handler for FastCgiTransport {
250 async fn handle(
251 &self,
252 req: &mut salvo::Request,
253 _depot: &mut salvo::Depot,
254 res: &mut salvo::Response,
255 ctrl: &mut salvo::FlowCtrl,
256 ) {
257 let client_addr = crate::hoops::client_addr(req);
258 let request = match goals::strip_request(req) {
259 Ok(r) => r,
260 Err(e) => {
261 goals::merge_response(res, e.into_response());
262 ctrl.skip_rest();
263 return;
264 }
265 };
266 let response = self
267 .run(request, client_addr)
268 .await
269 .unwrap_or_else(|e| e.into_response());
270 goals::merge_response(res, response);
271 ctrl.skip_rest();
272 }
273}
274
275impl FastCgiTransport {
276 async fn run(
277 &self,
278 request: http::Request<crate::Body>,
279 _client_addr: std::net::SocketAddr,
280 ) -> Result<http::Response<crate::Body>, crate::ProxyError> {
281 use http_body_util::BodyExt;
282
283 let (parts, body) = request.into_parts();
284 let body_bytes = body
285 .collect()
286 .await
287 .map_err(|e| ProxyError::Internal(format!("failed to buffer body: {e}")))?
288 .to_bytes();
289
290 self.send_request(&parts, &body_bytes).await
291 }
292}
293
294fn build_record(record_type: u8, request_id: u16, content: &[u8]) -> Vec<u8> {
300 let content_len = content.len() as u16;
301 let padding_len = padding_for(content.len());
302 let total = FCGI_HEADER_LEN + content.len() + padding_len as usize;
303
304 let mut buf = Vec::with_capacity(total);
305 buf.push(FCGI_VERSION_1);
306 buf.push(record_type);
307 buf.push((request_id >> 8) as u8);
308 buf.push(request_id as u8);
309 buf.push((content_len >> 8) as u8);
310 buf.push(content_len as u8);
311 buf.push(padding_len);
312 buf.push(0); buf.extend_from_slice(content);
314 buf.extend(std::iter::repeat_n(0u8, padding_len as usize));
316 buf
317}
318
319fn padding_for(content_len: usize) -> u8 {
321 let remainder = content_len % 8;
322 if remainder == 0 {
323 0
324 } else {
325 (8 - remainder) as u8
326 }
327}
328
329fn build_begin_request_body(role: u16, flags: u8) -> [u8; 8] {
331 let mut body = [0u8; 8];
332 body[0] = (role >> 8) as u8;
333 body[1] = role as u8;
334 body[2] = flags;
335 body
337}
338
339fn encode_params(params: &[(String, String)]) -> Vec<u8> {
341 let mut buf = BytesMut::new();
342 for (name, value) in params {
343 encode_length(&mut buf, name.len());
344 encode_length(&mut buf, value.len());
345 buf.put_slice(name.as_bytes());
346 buf.put_slice(value.as_bytes());
347 }
348 buf.to_vec()
349}
350
351fn encode_length(buf: &mut BytesMut, len: usize) {
355 if len < 128 {
356 buf.put_u8(len as u8);
357 } else {
358 buf.put_u8(((len >> 24) as u8) | 0x80);
359 buf.put_u8((len >> 16) as u8);
360 buf.put_u8((len >> 8) as u8);
361 buf.put_u8(len as u8);
362 }
363}
364
365async fn send_stream_records(
367 stream: &mut TcpStream,
368 record_type: u8,
369 request_id: u16,
370 data: &[u8],
371) -> Result<(), ProxyError> {
372 let mut offset = 0;
373 while offset < data.len() {
374 let end = std::cmp::min(offset + FCGI_MAX_CONTENT_LEN, data.len());
375 let chunk = &data[offset..end];
376 let record = build_record(record_type, request_id, chunk);
377 stream.write_all(&record).await?;
378 offset = end;
379 }
380 Ok(())
381}
382
383struct RecordHeader {
388 #[allow(dead_code)]
389 version: u8,
390 record_type: u8,
391 #[allow(dead_code)]
392 request_id: u16,
393 content_length: u16,
394 padding_length: u8,
395}
396
397async fn read_record_header(stream: &mut TcpStream) -> Result<RecordHeader, ProxyError> {
398 let mut buf = [0u8; FCGI_HEADER_LEN];
399 stream
400 .read_exact(&mut buf)
401 .await
402 .map_err(|e| ProxyError::Internal(format!("failed to read FastCGI record header: {e}")))?;
403
404 Ok(RecordHeader {
405 version: buf[0],
406 record_type: buf[1],
407 request_id: u16::from_be_bytes([buf[2], buf[3]]),
408 content_length: u16::from_be_bytes([buf[4], buf[5]]),
409 padding_length: buf[6],
410 })
411}
412
413async fn read_exact(stream: &mut TcpStream, len: usize) -> Result<Vec<u8>, ProxyError> {
414 if len == 0 {
415 return Ok(Vec::new());
416 }
417 let mut buf = vec![0u8; len];
418 stream.read_exact(&mut buf).await.map_err(|e| {
419 ProxyError::Internal(format!("failed to read {len} bytes from FastCGI: {e}"))
420 })?;
421 Ok(buf)
422}
423
424fn parse_cgi_response(data: &[u8]) -> Result<Response<Body>, ProxyError> {
440 let separator = find_subsequence(data, b"\r\n\r\n");
442 let (header_bytes, body_bytes) = match separator {
443 Some(pos) => (&data[..pos], &data[pos + 4..]),
444 None => {
445 (&[] as &[u8], data)
447 }
448 };
449
450 let header_str = String::from_utf8_lossy(header_bytes);
451 let mut status = http::StatusCode::OK;
452 let mut builder = Response::builder();
453
454 for line in header_str.split("\r\n") {
455 if line.is_empty() {
456 continue;
457 }
458 if let Some(colon_pos) = line.find(':') {
459 let name = line[..colon_pos].trim();
460 let value = line[colon_pos + 1..].trim();
461
462 if name.eq_ignore_ascii_case("status") {
463 let code_str = value.split_whitespace().next().unwrap_or("200");
465 if let Ok(code) = code_str.parse::<u16>() {
466 status = http::StatusCode::from_u16(code).unwrap_or(http::StatusCode::OK);
467 }
468 } else {
469 if let (Ok(hn), Ok(hv)) = (
471 name.parse::<http::header::HeaderName>(),
472 value.parse::<http::header::HeaderValue>(),
473 ) {
474 builder = builder.header(hn, hv);
475 }
476 }
477 }
478 }
479
480 builder = builder.status(status);
481 let body = full_body(bytes::Bytes::copy_from_slice(body_bytes));
482 builder
483 .body(body)
484 .map_err(|e| ProxyError::Internal(format!("failed to build FastCGI response: {e}")))
485}
486
487fn find_subsequence(haystack: &[u8], needle: &[u8]) -> Option<usize> {
489 haystack
490 .windows(needle.len())
491 .position(|window| window == needle)
492}
493
494fn split_script_path(path: &str, split: &str) -> (String, String) {
500 if let Some(pos) = path.find(split) {
501 let split_end = pos + split.len();
502 let script = &path[..split_end];
503 let info = &path[split_end..];
504 (script.to_string(), info.to_string())
505 } else {
506 (path.to_string(), String::new())
507 }
508}
509
510#[cfg(test)]
511mod tests {
512 use super::*;
513
514 #[test]
515 fn test_build_record() {
516 let record = build_record(FCGI_BEGIN_REQUEST, 1, &[0; 8]);
517 assert_eq!(record[0], FCGI_VERSION_1);
518 assert_eq!(record[1], FCGI_BEGIN_REQUEST);
519 assert_eq!(record.len(), FCGI_HEADER_LEN + 8); }
521
522 #[test]
523 fn test_encode_length_short() {
524 let mut buf = BytesMut::new();
525 encode_length(&mut buf, 5);
526 assert_eq!(buf.len(), 1);
527 assert_eq!(buf[0], 5);
528 }
529
530 #[test]
531 fn test_encode_length_long() {
532 let mut buf = BytesMut::new();
533 encode_length(&mut buf, 300);
534 assert_eq!(buf.len(), 4);
535 assert_eq!(buf[0] & 0x80, 0x80); }
537
538 #[test]
539 fn test_parse_cgi_response_basic() {
540 let data = b"Status: 200 OK\r\nContent-Type: text/html\r\n\r\n<h1>Hello</h1>";
541 let resp = parse_cgi_response(data).unwrap();
542 assert_eq!(resp.status(), 200);
543 assert_eq!(resp.headers().get("content-type").unwrap(), "text/html");
544 }
545
546 #[test]
547 fn test_parse_cgi_response_no_status() {
548 let data = b"Content-Type: text/plain\r\n\r\nHello";
549 let resp = parse_cgi_response(data).unwrap();
550 assert_eq!(resp.status(), 200);
551 }
552
553 #[test]
554 fn test_split_script_path() {
555 let (script, info) = split_script_path("/app/index.php/foo/bar", ".php");
556 assert_eq!(script, "/app/index.php");
557 assert_eq!(info, "/foo/bar");
558 }
559
560 #[test]
561 fn test_split_script_path_no_match() {
562 let (script, info) = split_script_path("/app/style.css", ".php");
563 assert_eq!(script, "/app/style.css");
564 assert_eq!(info, "");
565 }
566
567 #[test]
568 fn test_padding_for() {
569 assert_eq!(padding_for(0), 0);
570 assert_eq!(padding_for(8), 0);
571 assert_eq!(padding_for(1), 7);
572 assert_eq!(padding_for(10), 6);
573 }
574
575 #[test]
576 fn test_encode_params() {
577 let params = vec![("KEY".to_string(), "val".to_string())];
578 let encoded = encode_params(¶ms);
579 assert_eq!(encoded.len(), 8);
581 assert_eq!(encoded[0], 3); assert_eq!(encoded[1], 3); }
584}