1#[cfg(test)]
2pub mod tests;
3#[cfg(test)]
4mod example;
5
6use std::io::prelude::*;
7use std::borrow::Borrow;
8use std::net::{IpAddr, SocketAddr, TcpListener};
9use std::str::FromStr;
10use std::time::Duration;
11
12use crate::request::{METHOD, Request};
13use crate::response::{Response, STATUS_CODE_REASON_PHRASE};
14use crate::app::App;
15use crate::application::Application;
16use crate::entry_point::{bootstrap, get_ip_port_thread_count, get_request_allocation_size, set_default_values};
17use crate::header::Header;
18use crate::log::Log;
19use crate::mime_type::MimeType;
20use crate::range::{ContentRange, Range};
21use crate::symbol::SYMBOL;
22use crate::thread_pool::ThreadPool;
23
24pub struct Server {}
25impl Server {
26 pub fn process_request(mut stream: impl Read + Write + Unpin, peer_addr: SocketAddr) -> Vec<u8> {
27 let request_allocation_size = get_request_allocation_size();
28 let mut buffer = vec![0; request_allocation_size as usize];
29 let boxed_read = stream.read(&mut buffer);
30 if boxed_read.is_err() {
31 let message = boxed_read.err().unwrap().to_string();
32 eprintln!("unable to read TCP stream {}", &message);
33
34 let raw_response = Server::bad_request_response(message);
35 let boxed_stream = stream.write(raw_response.borrow());
36 if boxed_stream.is_ok() {
37 stream.flush().unwrap();
38 };
39 return raw_response;
40 }
41
42 boxed_read.unwrap();
43 let request : &[u8] = &buffer;
44
45 let boxed_request = Request::parse_request(request);
50 if boxed_request.is_err() {
51 let message = boxed_request.err().unwrap();
52 eprintln!("unable to parse request: {}", &message);
53
54 let raw_response = Server::bad_request_response(message);
55 let boxed_stream = stream.write(raw_response.borrow());
56 if boxed_stream.is_ok() {
57 stream.flush().unwrap();
58 };
59 return raw_response;
60 }
61
62
63 let request: Request = boxed_request.unwrap();
64 let (response, request) = App::handle_request(request);
65
66
67 let log_request_response = Log::combined(&request, &response, &peer_addr);
68 println!("{}", log_request_response);
69 let raw_response = Response::generate_response(response, request);
70
71 let boxed_stream = stream.write(raw_response.borrow());
72 if boxed_stream.is_ok() {
73 stream.flush().unwrap();
74 };
75
76 raw_response
77 }
78
79 pub fn bad_request_response(message: String) -> Vec<u8> {
80 let error_request = Request {
81 method: METHOD.get.to_string(),
82 request_uri: "".to_string(),
83 http_version: "".to_string(),
84 headers: vec![],
85 body: vec![],
86 };
87
88 let size = message.chars().count() as u64;
89 let content_range = ContentRange {
90 unit: Range::BYTES.to_string(),
91 range: Range { start: 0, end: size },
92 size: size.to_string(),
93 body: Vec::from(message.as_bytes()),
94 content_type: MimeType::TEXT_PLAIN.to_string(),
95 };
96
97 let header_list = Header::get_header_list(&error_request);
98 let error_response: Response = Response::get_response(
99 STATUS_CODE_REASON_PHRASE.n400_bad_request,
100 Some(header_list),
101 Some(vec![content_range])
102 );
103
104 let response = Response::generate_response(error_response, error_request);
105 return response;
106 }
107
108 pub fn payload_too_large_response(message: String) -> Vec<u8> {
114 let error_request = Request {
115 method: METHOD.get.to_string(),
116 request_uri: "".to_string(),
117 http_version: "".to_string(),
118 headers: vec![],
119 body: vec![],
120 };
121
122 let size = message.chars().count() as u64;
123 let content_range = ContentRange {
124 unit: Range::BYTES.to_string(),
125 range: Range { start: 0, end: size },
126 size: size.to_string(),
127 body: Vec::from(message.as_bytes()),
128 content_type: MimeType::TEXT_PLAIN.to_string(),
129 };
130
131 let mut header_list = Header::get_header_list(&error_request);
132 header_list.push(Header { name: Header::_CONNECTION.to_string(), value: "close".to_string() });
133 let error_response: Response = Response::get_response(
134 STATUS_CODE_REASON_PHRASE.n413_payload_too_large,
135 Some(header_list),
136 Some(vec![content_range])
137 );
138
139 Response::generate_response(error_response, error_request)
140 }
141
142 pub fn process(mut stream: impl Read + Write + Unpin,
143 connection: ConnectionInfo,
144 app: impl Application) -> Result<(), String> {
145 use crate::http::VERSION;
146
147 let request_allocation_size = connection.request_size;
148 let client = connection.client.clone();
149 let client_addr = SocketAddr::new(IpAddr::from_str(client.ip.as_str()).unwrap(), client.port as u16);
150
151 let max_body_size = crate::entry_point::get_max_body_size();
152
153 loop {
154 let mut buffer = vec![0; request_allocation_size as usize];
155 let boxed_read = stream.read(&mut buffer);
156 let n = match boxed_read {
157 Err(_) => break,
159 Ok(0) => break,
160 Ok(n) => n,
161 };
162
163 let mut request = match Request::parse(&buffer[..n]) {
166 Ok(r) => r,
167 Err(message) => {
168 let raw_response = Server::bad_request_response(message.clone());
169 let boxed_stream = stream.write(raw_response.borrow());
170 if boxed_stream.is_ok() { stream.flush().unwrap(); }
171 return Err(message);
172 }
173 };
174
175 if let Some(declared_len) = request
179 .get_header(Header::_CONTENT_LENGTH.to_string())
180 .and_then(|h| h.value.trim().parse::<u64>().ok())
181 {
182 if max_body_size > 0 && declared_len > max_body_size {
183 let raw_response = Server::payload_too_large_response(format!(
184 "413 Payload Too Large: declared Content-Length {} exceeds the {} byte limit",
185 declared_len, max_body_size
186 ));
187 let boxed_stream = stream.write(raw_response.borrow());
188 if boxed_stream.is_ok() { stream.flush().unwrap(); }
189 break;
190 }
191
192 while (request.body.len() as u64) < declared_len {
193 let mut more = vec![0u8; request_allocation_size as usize];
194 match stream.read(&mut more) {
195 Ok(0) | Err(_) => break, Ok(k) => request.body.extend_from_slice(&more[..k]),
197 }
198 }
199 }
200
201 let keep_alive = {
202 let conn_hdr = request.get_header(Header::_CONNECTION.to_string());
203 match conn_hdr {
204 Some(h) => h.value.to_lowercase() != "close",
205 None => request.http_version == VERSION.http_1_1,
206 }
207 };
208
209 let mut response = match app.execute(&request, &connection) {
210 Ok(r) => r,
211 Err(message) => {
212 let raw_response = Server::bad_request_response(message.clone());
213 let boxed_stream = stream.write(raw_response.borrow());
214 if boxed_stream.is_ok() { stream.flush().unwrap(); }
215 return Err(message);
216 }
217 };
218
219 crate::metrics::record_request();
220 crate::compression::apply_gzip(&request, &mut response);
221
222 response.headers.push(Header {
223 name: Header::_CONNECTION.to_string(),
224 value: if keep_alive { "keep-alive".to_string() } else { "close".to_string() },
225 });
226
227 Log::log_access(&request, &response, &client_addr);
228
229 if let Some(reader) = response.stream_pipe.take() {
230 if let Err(e) = Server::pipe_stream(&mut stream, response, request, reader) {
231 return Err(e);
232 }
233 } else if let Some(ref filepath) = response.stream_file.clone() {
234 if let Err(e) = Server::write_chunked_file(&mut stream, response, request, filepath) {
235 return Err(e);
236 }
237 } else {
238 let raw_response = Response::generate_response(response, request);
239 if let Err(e) = stream.write(raw_response.borrow()) {
240 return Err(e.to_string());
241 }
242 stream.flush().unwrap();
243 }
244
245 if !keep_alive { break; }
246 }
247
248 Ok(())
249 }
250
251 pub(crate) fn write_chunked_file(
254 stream: &mut impl Write,
255 mut response: Response,
256 request: Request,
257 filepath: &str,
258 ) -> Result<(), String> {
259 use std::fs::File;
260 use std::io::Read as _;
261
262 response.headers.push(Header {
263 name: Header::_TRANSFER_ENCODING.to_string(),
264 value: "chunked".to_string(),
265 });
266
267 let status = [
269 response.http_version.clone(),
270 response.status_code.to_string(),
271 response.reason_phrase.clone(),
272 ].join(SYMBOL.whitespace);
273
274 let mut headers_str = SYMBOL.new_line_carriage_return.to_string();
275 for header in &response.headers {
276 headers_str.push_str(&header.name);
277 headers_str.push_str(Header::NAME_VALUE_SEPARATOR);
278 headers_str.push_str(&header.value);
279 headers_str.push_str(SYMBOL.new_line_carriage_return);
280 }
281 let head = format!("{}{}{}", status, headers_str, SYMBOL.new_line_carriage_return);
282
283 stream.write_all(head.as_bytes()).map_err(|e| e.to_string())?;
284
285 if request.method != METHOD.head && request.method != METHOD.options {
286 let mut file = File::open(filepath).map_err(|e| e.to_string())?;
287 let mut buf = vec![0u8; 65536];
288 loop {
289 let n = file.read(&mut buf).map_err(|e| e.to_string())?;
290 if n == 0 { break; }
291 stream.write_all(format!("{:x}\r\n", n).as_bytes()).map_err(|e| e.to_string())?;
293 stream.write_all(&buf[..n]).map_err(|e| e.to_string())?;
294 stream.write_all(b"\r\n").map_err(|e| e.to_string())?;
295 }
296 stream.write_all(b"0\r\n\r\n").map_err(|e| e.to_string())?;
298 }
299
300 stream.flush().map_err(|e| e.to_string())
301 }
302
303 pub(crate) fn pipe_stream(
311 stream: &mut impl Write,
312 mut response: Response,
313 request: Request,
314 mut reader: Box<dyn std::io::Read + Send>,
315 ) -> Result<(), String> {
316 let backend_is_chunked = response.headers.iter().any(|h| {
320 h.name.eq_ignore_ascii_case("transfer-encoding")
321 && h.value.to_lowercase().contains("chunked")
322 });
323
324 response.headers.retain(|h| !h.name.eq_ignore_ascii_case("content-length"));
325 if !backend_is_chunked {
326 response.headers.push(Header {
327 name: Header::_TRANSFER_ENCODING.to_string(),
328 value: "chunked".to_string(),
329 });
330 }
331
332 let status = [
333 response.http_version.clone(),
334 response.status_code.to_string(),
335 response.reason_phrase.clone(),
336 ].join(SYMBOL.whitespace);
337
338 let mut head = format!("{}\r\n", status);
339 for header in &response.headers {
340 head.push_str(&header.name);
341 head.push_str(Header::NAME_VALUE_SEPARATOR);
342 head.push_str(&header.value);
343 head.push_str(SYMBOL.new_line_carriage_return);
344 }
345 head.push_str(SYMBOL.new_line_carriage_return);
346 stream.write_all(head.as_bytes()).map_err(|e| e.to_string())?;
347
348 if request.method != METHOD.head && request.method != METHOD.options {
349 let mut buf = [0u8; 8192];
350 if backend_is_chunked {
351 loop {
354 match reader.read(&mut buf) {
355 Ok(0) | Err(_) => break,
356 Ok(n) => {
357 stream.write_all(&buf[..n]).map_err(|e| e.to_string())?;
358 stream.flush().map_err(|e| e.to_string())?;
359 }
360 }
361 }
362 } else {
363 loop {
367 match reader.read(&mut buf) {
368 Ok(0) | Err(_) => break,
369 Ok(n) => {
370 stream
371 .write_all(format!("{:x}\r\n", n).as_bytes())
372 .map_err(|e| e.to_string())?;
373 stream.write_all(&buf[..n]).map_err(|e| e.to_string())?;
374 stream.write_all(b"\r\n").map_err(|e| e.to_string())?;
375 stream.flush().map_err(|e| e.to_string())?;
376 }
377 }
378 }
379 stream.write_all(b"0\r\n\r\n").map_err(|e| e.to_string())?;
380 }
381 }
382
383 stream.flush().map_err(|e| e.to_string())
384 }
385
386 pub fn setup() -> Result<(TcpListener, ThreadPool), String> {
389 let info = Log::info("Rust Web Server");
390 println!("{}", info);
391
392 let usage_info = Log::usage_information();
393 println!("{}", usage_info);
394
395
396 println!("RWS Configuration Start: \n");
397
398 set_default_values();
399 bootstrap();
400
401 println!("\nRWS Configuration End\n\n");
402
403
404 let (ip, port, thread_count) = get_ip_port_thread_count();
405
406
407 let mut ip_readable = ip.to_string();
408
409 if ip.contains(":") {
410 ip_readable = [SYMBOL.opening_square_bracket, &ip, SYMBOL.closing_square_bracket].join("");
411 }
412
413 let bind_addr = [ip_readable, SYMBOL.colon.to_string(), port.to_string()].join(SYMBOL.empty_string);
414
415 #[cfg(feature = "http2")]
416 let protocol = {
417 let cert = std::env::var(crate::entry_point::Config::RWS_CONFIG_TLS_CERT_FILE).unwrap_or_default();
418 if cert.is_empty() { "http" } else { "https" }
419 };
420 #[cfg(not(feature = "http2"))]
421 let protocol = "http";
422
423 println!("Setting up {}://{}...", protocol, &bind_addr);
424
425 let boxed_listener = TcpListener::bind(&bind_addr);
426 if boxed_listener.is_err() {
427 let message = format!("unable to set up TCP listener: {}", boxed_listener.err().unwrap());
428 return Err(message);
429 }
430
431 let listener = boxed_listener.unwrap();
432 let pool = ThreadPool::new(thread_count as usize);
433
434
435 let server_url_thread_count = Log::server_url_thread_count(protocol, &bind_addr, thread_count);
436 println!("{}", server_url_thread_count);
437
438 Ok((listener, pool))
439 }
440
441 pub fn run(listener: TcpListener,
449 pool: ThreadPool,
450 app: impl Application + Send + 'static + Clone) {
451 #[cfg(feature = "http1")]
452 {
453 use std::sync::Arc;
454 use std::sync::atomic::{AtomicBool, Ordering};
455
456 let shutdown = Arc::new(AtomicBool::new(false));
457 let s = shutdown.clone();
458 if let Err(e) = ctrlc::set_handler(move || {
459 s.store(true, Ordering::SeqCst);
460 }) {
461 eprintln!("unable to install signal handler: {}", e);
462 }
463 crate::config_reload::install_sighup_handler();
464 if let Err(e) = listener.set_nonblocking(true) {
465 eprintln!("unable to set non-blocking listener: {}", e);
466 }
467
468 loop {
469 if shutdown.load(Ordering::SeqCst) {
470 break;
471 }
472 if crate::config_reload::RELOAD_REQUESTED
473 .compare_exchange(true, false, Ordering::SeqCst, Ordering::Relaxed)
474 .is_ok()
475 {
476 crate::config_reload::reload();
477 }
478 match listener.accept() {
479 Ok((stream, peer_addr)) => {
480 Server::dispatch_connection(stream, peer_addr, &pool, app.clone());
481 }
482 Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => {
483 std::thread::sleep(Duration::from_millis(10));
484 }
485 Err(e) => {
486 eprintln!("accept error: {}", e);
487 break;
488 }
489 }
490 }
491
492 crate::metrics::SERVER_READY.store(false, std::sync::atomic::Ordering::SeqCst);
493 println!("Shutting down — waiting for in-flight connections to finish");
494 pool.join();
495 println!("Server stopped");
496 }
497
498 #[cfg(not(feature = "http1"))]
499 {
500 for boxed_stream in listener.incoming() {
501 match boxed_stream {
502 Err(e) => {
503 eprintln!("unable to get TCP stream: {}", e);
504 return;
505 }
506 Ok(stream) => {
507 let peer_addr = match stream.peer_addr() {
508 Ok(a) => a,
509 Err(e) => {
510 eprintln!("unable to read peer addr: {}", e);
511 return;
512 }
513 };
514 Server::dispatch_connection(stream, peer_addr, &pool, app.clone());
515 }
516 }
517 }
518 }
519 }
520
521 fn dispatch_connection(
522 stream: std::net::TcpStream,
523 peer_addr: std::net::SocketAddr,
524 pool: &ThreadPool,
525 app: impl Application + Send + 'static + Clone,
526 ) {
527 print!("Connection established, ");
528 if let Ok(local) = stream.local_addr() {
529 print!("local addr: {}", local);
530 }
531 println!(", peer addr: {}", peer_addr);
532
533 let (server_ip, server_port, _thread_count) = get_ip_port_thread_count();
534 let connection = ConnectionInfo {
535 client: Address {
536 ip: peer_addr.ip().to_string(),
537 port: peer_addr.port() as i32,
538 },
539 server: Address {
540 ip: server_ip,
541 port: server_port,
542 },
543 request_size: get_request_allocation_size(),
544 sni_hostname: None,
545 };
546
547 if let Err(e) = stream.set_read_timeout(Some(Duration::from_secs(30))) {
548 eprintln!("failed to set read timeout: {}", e);
549 }
550
551 pool.execute(move || {
552 crate::metrics::connection_open();
553 let result = Server::process(stream, connection, app);
554 crate::metrics::connection_close();
555 if let Err(msg) = result {
556 crate::metrics::record_error();
557 eprintln!("{}", msg);
558 }
559 });
560 }
561
562}
563
564#[derive(Clone)]
566pub struct ConnectionInfo {
567 pub client: Address,
569 pub server: Address,
571 pub request_size: i64,
573 pub sni_hostname: Option<String>,
576}
577
578#[derive(Clone)]
580pub struct Address {
581 pub ip: String,
582 pub port: i32
583}
584
585impl ConnectionInfo {
586 pub fn peer_addr(&self) -> Option<std::net::SocketAddr> {
589 self.client.to_socket_addr()
590 }
591}
592
593impl Address {
594 pub fn to_socket_addr(&self) -> Option<std::net::SocketAddr> {
597 let ip: std::net::IpAddr = self.ip.parse().ok()?;
598 let port = u16::try_from(self.port).ok()?;
599 Some(std::net::SocketAddr::new(ip, port))
600 }
601}
602
603#[cfg(feature = "http2")]
606async fn sigterm() {
607 #[cfg(unix)]
608 {
609 if let Ok(mut s) = tokio::signal::unix::signal(
610 tokio::signal::unix::SignalKind::terminate()
611 ) {
612 s.recv().await;
613 } else {
614 std::future::pending::<()>().await
615 }
616 }
617 #[cfg(not(unix))]
618 std::future::pending::<()>().await
619}
620
621#[cfg(feature = "http2")]
623async fn sighup() {
624 #[cfg(unix)]
625 {
626 if let Ok(mut s) = tokio::signal::unix::signal(
627 tokio::signal::unix::SignalKind::hangup()
628 ) {
629 s.recv().await;
630 } else {
631 std::future::pending::<()>().await
632 }
633 }
634 #[cfg(not(unix))]
635 std::future::pending::<()>().await
636}
637
638#[cfg(feature = "http2")]
639impl Server {
640 pub async fn run_tls(
641 listener: TcpListener,
642 pool: ThreadPool,
643 app: impl Application + Send + 'static + Clone,
644 ) {
645 use crate::tls::create_tls_acceptor_from_vhosts;
646 use crate::h2_handler;
647
648 let cert_path = std::env::var(crate::entry_point::Config::RWS_CONFIG_TLS_CERT_FILE)
649 .unwrap_or_default();
650 let key_path = std::env::var(crate::entry_point::Config::RWS_CONFIG_TLS_KEY_FILE)
651 .unwrap_or_default();
652
653 if cert_path.is_empty() || key_path.is_empty() {
654 println!("No TLS certificate configured — serving plain HTTP/1.1.");
655 tokio::task::block_in_place(|| Server::run(listener, pool, app));
656 return;
657 }
658
659 let vhosts = crate::entry_point::get_virtual_hosts();
660 let mut tls_acceptor = match create_tls_acceptor_from_vhosts(&vhosts, &cert_path, &key_path) {
661 Ok(a) => a,
662 Err(e) => {
663 eprintln!("TLS setup failed: {}", e);
664 return;
665 }
666 };
667
668 listener
669 .set_nonblocking(true)
670 .expect("failed to set TCP listener to non-blocking");
671 let tokio_listener = tokio::net::TcpListener::from_std(listener)
672 .expect("failed to convert TCP listener to tokio");
673
674 println!("Listening for TLS connections (HTTP/1.1 + HTTP/2)...");
675
676 loop {
677 tokio::select! {
678 result = tokio_listener.accept() => {
679 match result {
680 Ok((tcp_stream, peer_addr)) => {
681 let acceptor = tls_acceptor.clone();
682 let app = app.clone();
683 tokio::spawn(async move {
684 match acceptor.accept(tcp_stream).await {
685 Ok(tls_stream) => {
686 let server_conn = tls_stream.get_ref().1;
687 let sni = server_conn.server_name().map(|s| s.to_string());
688 let protocol = server_conn
689 .alpn_protocol()
690 .map(|p| p.to_vec());
691
692 match protocol.as_deref() {
693 Some(b"h2") => {
694 if let Err(e) =
695 h2_handler::handle_connection(tls_stream, peer_addr, sni, app)
696 .await
697 {
698 eprintln!("H2 connection error: {}", e);
699 }
700 }
701 _ => {
702 if let Err(e) =
703 Server::process_h1_tls(tls_stream, peer_addr, sni, app).await
704 {
705 eprintln!("H1 TLS error: {}", e);
706 }
707 }
708 }
709 }
710 Err(e) => eprintln!("TLS handshake failed: {}", e),
711 }
712 });
713 }
714 Err(e) => eprintln!("TCP accept error: {}", e),
715 }
716 }
717 _ = tokio::signal::ctrl_c() => {
718 crate::metrics::SERVER_READY.store(false, std::sync::atomic::Ordering::SeqCst);
719 println!("\nShutting down gracefully (SIGINT).");
720 break;
721 }
722 _ = sigterm() => {
723 crate::metrics::SERVER_READY.store(false, std::sync::atomic::Ordering::SeqCst);
724 println!("\nShutting down gracefully (SIGTERM).");
725 break;
726 }
727 _ = sighup() => {
728 crate::config_reload::reload();
729 let vhosts = crate::entry_point::get_virtual_hosts();
730 if let Ok(new_acceptor) = create_tls_acceptor_from_vhosts(&vhosts, &cert_path, &key_path) {
731 tls_acceptor = new_acceptor;
732 println!("[TLS] Certificates reloaded ({} virtual hosts).", vhosts.len());
733 }
734 }
735 }
736 }
737 }
738
739 pub async fn run_redirect() {
743 use std::env;
744 use tokio::io::{AsyncReadExt, AsyncWriteExt};
745 use tokio::net::TcpListener as TokioListener;
746
747 let cert_path = env::var(crate::entry_point::Config::RWS_CONFIG_TLS_CERT_FILE)
748 .unwrap_or_default();
749 if cert_path.is_empty() {
750 return;
751 }
752
753 let redirect_port_str = env::var(crate::entry_point::Config::RWS_CONFIG_HTTP_REDIRECT_PORT)
754 .unwrap_or_default();
755 if redirect_port_str.is_empty() {
756 return;
757 }
758
759 let redirect_port: u16 = match redirect_port_str.parse() {
760 Ok(p) => p,
761 Err(_) => {
762 eprintln!("Invalid RWS_CONFIG_HTTP_REDIRECT_PORT: {}", redirect_port_str);
763 return;
764 }
765 };
766
767 let (server_ip, server_port, _) = get_ip_port_thread_count();
768 let bind_addr = format!("{}:{}", server_ip, redirect_port);
769
770 let listener = match TokioListener::bind(&bind_addr).await {
771 Ok(l) => l,
772 Err(e) => {
773 eprintln!("HTTP redirect listener error on {}: {}", bind_addr, e);
774 return;
775 }
776 };
777
778 println!("HTTP→HTTPS redirect listening on http://{}:{}", server_ip, redirect_port);
779
780 loop {
781 tokio::select! {
782 result = listener.accept() => {
783 match result {
784 Ok((mut stream, _peer)) => {
785 let https_port = server_port;
786 tokio::spawn(async move {
787 let mut buf = vec![0u8; 4096];
788 let n = match stream.read(&mut buf).await {
789 Ok(n) => n,
790 Err(_) => return,
791 };
792 let text = String::from_utf8_lossy(&buf[..n]);
793
794 let uri = text.lines()
795 .next()
796 .and_then(|line| line.split_whitespace().nth(1))
797 .unwrap_or("/")
798 .to_string();
799
800 let host_header = text.lines()
801 .find(|l| l.to_lowercase().starts_with("host:"))
802 .map(|l| l[5..].trim().to_string());
803
804 let location = match host_header {
805 Some(h) => {
806 let h_no_port = if h.starts_with('[') {
808 h.find(']')
810 .map(|i| h[..=i].to_string())
811 .unwrap_or(h.clone())
812 } else {
813 h.rfind(':')
814 .map(|i| h[..i].to_string())
815 .unwrap_or(h.clone())
816 };
817 if https_port == 443 {
818 format!("https://{}{}", h_no_port, uri)
819 } else {
820 format!("https://{}:{}{}", h_no_port, https_port, uri)
821 }
822 }
823 None => format!("https://localhost:{}{}", https_port, uri),
824 };
825
826 let response = format!(
827 "HTTP/1.1 301 Moved Permanently\r\nLocation: {}\r\nContent-Length: 0\r\nConnection: close\r\n\r\n",
828 location
829 );
830 let _ = stream.write_all(response.as_bytes()).await;
831 });
832 }
833 Err(e) => eprintln!("HTTP redirect accept error: {}", e),
834 }
835 }
836 _ = tokio::signal::ctrl_c() => {
837 println!("\nShutting down HTTP redirect listener (SIGINT).");
838 break;
839 }
840 _ = sigterm() => {
841 println!("\nShutting down HTTP redirect listener (SIGTERM).");
842 break;
843 }
844 _ = sighup() => {
845 crate::config_reload::reload();
846 }
847 }
848 }
849 }
850
851 async fn process_h1_tls(
852 mut stream: tokio_rustls::server::TlsStream<tokio::net::TcpStream>,
853 peer_addr: std::net::SocketAddr,
854 sni_hostname: Option<String>,
855 app: impl Application,
856 ) -> Result<(), String> {
857 use tokio::io::{AsyncReadExt, AsyncWriteExt};
858
859 let (server_ip, server_port, _) = get_ip_port_thread_count();
860 let request_allocation_size = get_request_allocation_size();
861
862 let mut buffer = vec![0u8; request_allocation_size as usize];
863 let n = match stream.read(&mut buffer).await {
864 Ok(n) => n,
865 Err(e) => {
866 let raw = Server::bad_request_response(e.to_string());
867 let _ = stream.write_all(&raw).await;
868 return Ok(());
869 }
870 };
871
872 let mut request = match Request::parse(&buffer[..n]) {
875 Ok(r) => r,
876 Err(message) => {
877 let raw = Server::bad_request_response(message);
878 let _ = stream.write_all(&raw).await;
879 return Ok(());
880 }
881 };
882
883 if let Some(declared_len) = request
887 .get_header(Header::_CONTENT_LENGTH.to_string())
888 .and_then(|h| h.value.trim().parse::<u64>().ok())
889 {
890 let max_body_size = crate::entry_point::get_max_body_size();
891 if max_body_size > 0 && declared_len > max_body_size {
892 let raw = Server::payload_too_large_response(format!(
893 "413 Payload Too Large: declared Content-Length {} exceeds the {} byte limit",
894 declared_len, max_body_size
895 ));
896 let _ = stream.write_all(&raw).await;
897 return Ok(());
898 }
899
900 while (request.body.len() as u64) < declared_len {
901 let mut more = vec![0u8; request_allocation_size as usize];
902 match stream.read(&mut more).await {
903 Ok(0) | Err(_) => break, Ok(k) => request.body.extend_from_slice(&more[..k]),
905 }
906 }
907 }
908
909 let connection = ConnectionInfo {
910 client: Address {
911 ip: peer_addr.ip().to_string(),
912 port: peer_addr.port() as i32,
913 },
914 server: Address {
915 ip: server_ip,
916 port: server_port,
917 },
918 request_size: request_allocation_size,
919 sni_hostname,
920 };
921
922 let mut response = match app.execute(&request, &connection) {
923 Ok(r) => r,
924 Err(message) => {
925 let raw = Server::bad_request_response(message);
926 let _ = stream.write_all(&raw).await;
927 return Ok(());
928 }
929 };
930
931 crate::metrics::record_request();
932 crate::compression::apply_gzip(&request, &mut response);
933 response.headers.push(Header::get_hsts_header());
934
935 #[cfg(feature = "http3")]
936 response.headers.push(Header {
937 name: Header::_ALT_SVC.to_string(),
938 value: format!("h3=\":{}\"", server_port),
939 });
940 #[cfg(not(feature = "http3"))]
941 response.headers.push(Header {
942 name: Header::_ALT_SVC.to_string(),
943 value: format!("h2=\":{}\"", server_port),
944 });
945
946 Log::log_access(&request, &response, &peer_addr);
947
948 let raw = Response::generate_response(response, request);
949 stream
950 .write_all(&raw)
951 .await
952 .map_err(|e| e.to_string())?;
953 stream.flush().await.map_err(|e| e.to_string())?;
954
955 Ok(())
956 }
957}
958
959#[cfg(feature = "http3")]
960impl Server {
961 pub async fn run_quic(
962 app: impl Application + Send + 'static + Clone,
963 ) {
964 use crate::tls::create_quinn_server_config_from_vhosts;
965 use crate::h3_handler;
966
967 let cert_path = std::env::var(crate::entry_point::Config::RWS_CONFIG_TLS_CERT_FILE)
968 .unwrap_or_default();
969 let key_path = std::env::var(crate::entry_point::Config::RWS_CONFIG_TLS_KEY_FILE)
970 .unwrap_or_default();
971
972 if cert_path.is_empty() || key_path.is_empty() {
973 return;
974 }
975
976 let vhosts = crate::entry_point::get_virtual_hosts();
977 let server_config = match create_quinn_server_config_from_vhosts(&vhosts, &cert_path, &key_path) {
978 Ok(c) => c,
979 Err(e) => {
980 eprintln!("QUIC TLS setup failed: {}", e);
981 return;
982 }
983 };
984
985 let (server_ip, server_port, _) = get_ip_port_thread_count();
986 let bind_addr = format!("{}:{}", server_ip, server_port);
987 let addr: std::net::SocketAddr = match bind_addr.parse() {
988 Ok(a) => a,
989 Err(e) => {
990 eprintln!("Invalid QUIC bind address '{}': {}", bind_addr, e);
991 return;
992 }
993 };
994
995 let endpoint = match quinn::Endpoint::server(server_config, addr) {
996 Ok(e) => e,
997 Err(e) => {
998 eprintln!("QUIC endpoint error: {}", e);
999 return;
1000 }
1001 };
1002
1003 println!("Listening for QUIC/HTTP3 on UDP {}:{}", server_ip, server_port);
1004
1005 loop {
1006 tokio::select! {
1007 maybe = endpoint.accept() => {
1008 match maybe {
1009 Some(incoming) => {
1010 let app = app.clone();
1011 tokio::spawn(async move {
1012 match incoming.await {
1013 Ok(conn) => {
1014 let peer_addr = conn.remote_address();
1015 if let Err(e) = h3_handler::handle_connection(conn, peer_addr, app).await {
1016 eprintln!("H3 connection error: {}", e);
1017 }
1018 }
1019 Err(e) => eprintln!("QUIC connection error: {}", e),
1020 }
1021 });
1022 }
1023 None => break,
1024 }
1025 }
1026 _ = tokio::signal::ctrl_c() => {
1027 crate::metrics::SERVER_READY.store(false, std::sync::atomic::Ordering::SeqCst);
1028 println!("\nShutting down QUIC (SIGINT).");
1029 endpoint.close(0u32.into(), b"shutdown");
1030 break;
1031 }
1032 _ = sigterm() => {
1033 crate::metrics::SERVER_READY.store(false, std::sync::atomic::Ordering::SeqCst);
1034 println!("\nShutting down QUIC (SIGTERM).");
1035 endpoint.close(0u32.into(), b"shutdown");
1036 break;
1037 }
1038 _ = sighup() => {
1039 crate::config_reload::reload();
1040 }
1041 }
1042 }
1043 }
1044}
1045
1046