1#[cfg(test)]
2pub mod tests;
3#[cfg(test)]
4mod example;
5
6use std::io::prelude::*;
7use std::borrow::Borrow;
8use std::net::{IpAddr, SocketAddr, TcpListener};
9use std::str::FromStr;
10use std::time::Duration;
11
12use crate::request::{METHOD, Request};
13use crate::response::{Response, STATUS_CODE_REASON_PHRASE};
14use crate::app::App;
15use crate::application::Application;
16use crate::entry_point::{bootstrap, get_ip_port_thread_count, get_request_allocation_size, set_default_values};
17use crate::header::Header;
18use crate::log::Log;
19use crate::mime_type::MimeType;
20use crate::range::{ContentRange, Range};
21use crate::symbol::SYMBOL;
22use crate::thread_pool::ThreadPool;
23
24pub struct Server {}
25impl Server {
26 pub fn process_request(mut stream: impl Read + Write + Unpin, peer_addr: SocketAddr) -> Vec<u8> {
27 let request_allocation_size = get_request_allocation_size();
28 let mut buffer = vec![0; request_allocation_size as usize];
29 let boxed_read = stream.read(&mut buffer);
30 if boxed_read.is_err() {
31 let message = boxed_read.err().unwrap().to_string();
32 eprintln!("unable to read TCP stream {}", &message);
33
34 let raw_response = Server::bad_request_response(message);
35 let boxed_stream = stream.write(raw_response.borrow());
36 if boxed_stream.is_ok() {
37 stream.flush().unwrap();
38 };
39 return raw_response;
40 }
41
42 boxed_read.unwrap();
43 let request : &[u8] = &buffer;
44
45 let boxed_request = Request::parse_request(request);
50 if boxed_request.is_err() {
51 let message = boxed_request.err().unwrap();
52 eprintln!("unable to parse request: {}", &message);
53
54 let raw_response = Server::bad_request_response(message);
55 let boxed_stream = stream.write(raw_response.borrow());
56 if boxed_stream.is_ok() {
57 stream.flush().unwrap();
58 };
59 return raw_response;
60 }
61
62
63 let request: Request = boxed_request.unwrap();
64 let (response, request) = App::handle_request(request);
65
66
67 let log_request_response = Log::combined(&request, &response, &peer_addr);
68 println!("{}", log_request_response);
69 let raw_response = Response::generate_response(response, request);
70
71 let boxed_stream = stream.write(raw_response.borrow());
72 if boxed_stream.is_ok() {
73 stream.flush().unwrap();
74 };
75
76 raw_response
77 }
78
79 pub fn bad_request_response(message: String) -> Vec<u8> {
80 let error_request = Request {
81 method: METHOD.get.to_string(),
82 request_uri: "".to_string(),
83 http_version: "".to_string(),
84 headers: vec![],
85 body: vec![],
86 };
87
88 let size = message.chars().count() as u64;
89 let content_range = ContentRange {
90 unit: Range::BYTES.to_string(),
91 range: Range { start: 0, end: size },
92 size: size.to_string(),
93 body: Vec::from(message.as_bytes()),
94 content_type: MimeType::TEXT_PLAIN.to_string(),
95 };
96
97 let header_list = Header::get_header_list(&error_request);
98 let error_response: Response = Response::get_response(
99 STATUS_CODE_REASON_PHRASE.n400_bad_request,
100 Some(header_list),
101 Some(vec![content_range])
102 );
103
104 let response = Response::generate_response(error_response, error_request);
105 return response;
106 }
107
108 pub fn process(mut stream: impl Read + Write + Unpin,
109 connection: ConnectionInfo,
110 app: impl Application) -> Result<(), String> {
111 use crate::http::VERSION;
112
113 let request_allocation_size = connection.request_size;
114 let client = connection.client.clone();
115 let client_addr = SocketAddr::new(IpAddr::from_str(client.ip.as_str()).unwrap(), client.port as u16);
116
117 loop {
118 let mut buffer = vec![0; request_allocation_size as usize];
119 let boxed_read = stream.read(&mut buffer);
120 if boxed_read.is_err() {
121 break;
123 }
124 if boxed_read.unwrap() == 0 {
125 break;
126 }
127
128 let request = match Request::parse(&buffer) {
129 Ok(r) => r,
130 Err(message) => {
131 let raw_response = Server::bad_request_response(message.clone());
132 let boxed_stream = stream.write(raw_response.borrow());
133 if boxed_stream.is_ok() { stream.flush().unwrap(); }
134 return Err(message);
135 }
136 };
137
138 let keep_alive = {
139 let conn_hdr = request.get_header(Header::_CONNECTION.to_string());
140 match conn_hdr {
141 Some(h) => h.value.to_lowercase() != "close",
142 None => request.http_version == VERSION.http_1_1,
143 }
144 };
145
146 let mut response = match app.execute(&request, &connection) {
147 Ok(r) => r,
148 Err(message) => {
149 let raw_response = Server::bad_request_response(message.clone());
150 let boxed_stream = stream.write(raw_response.borrow());
151 if boxed_stream.is_ok() { stream.flush().unwrap(); }
152 return Err(message);
153 }
154 };
155
156 crate::metrics::record_request();
157 crate::compression::apply_gzip(&request, &mut response);
158
159 response.headers.push(Header {
160 name: Header::_CONNECTION.to_string(),
161 value: if keep_alive { "keep-alive".to_string() } else { "close".to_string() },
162 });
163
164 Log::log_access(&request, &response, &client_addr);
165
166 if let Some(reader) = response.stream_pipe.take() {
167 if let Err(e) = Server::pipe_stream(&mut stream, response, request, reader) {
168 return Err(e);
169 }
170 } else if let Some(ref filepath) = response.stream_file.clone() {
171 if let Err(e) = Server::write_chunked_file(&mut stream, response, request, filepath) {
172 return Err(e);
173 }
174 } else {
175 let raw_response = Response::generate_response(response, request);
176 if let Err(e) = stream.write(raw_response.borrow()) {
177 return Err(e.to_string());
178 }
179 stream.flush().unwrap();
180 }
181
182 if !keep_alive { break; }
183 }
184
185 Ok(())
186 }
187
188 pub(crate) fn write_chunked_file(
191 stream: &mut impl Write,
192 mut response: Response,
193 request: Request,
194 filepath: &str,
195 ) -> Result<(), String> {
196 use std::fs::File;
197 use std::io::Read as _;
198
199 response.headers.push(Header {
200 name: Header::_TRANSFER_ENCODING.to_string(),
201 value: "chunked".to_string(),
202 });
203
204 let status = [
206 response.http_version.clone(),
207 response.status_code.to_string(),
208 response.reason_phrase.clone(),
209 ].join(SYMBOL.whitespace);
210
211 let mut headers_str = SYMBOL.new_line_carriage_return.to_string();
212 for header in &response.headers {
213 headers_str.push_str(&header.name);
214 headers_str.push_str(Header::NAME_VALUE_SEPARATOR);
215 headers_str.push_str(&header.value);
216 headers_str.push_str(SYMBOL.new_line_carriage_return);
217 }
218 let head = format!("{}{}{}", status, headers_str, SYMBOL.new_line_carriage_return);
219
220 stream.write_all(head.as_bytes()).map_err(|e| e.to_string())?;
221
222 if request.method != METHOD.head && request.method != METHOD.options {
223 let mut file = File::open(filepath).map_err(|e| e.to_string())?;
224 let mut buf = vec![0u8; 65536];
225 loop {
226 let n = file.read(&mut buf).map_err(|e| e.to_string())?;
227 if n == 0 { break; }
228 stream.write_all(format!("{:x}\r\n", n).as_bytes()).map_err(|e| e.to_string())?;
230 stream.write_all(&buf[..n]).map_err(|e| e.to_string())?;
231 stream.write_all(b"\r\n").map_err(|e| e.to_string())?;
232 }
233 stream.write_all(b"0\r\n\r\n").map_err(|e| e.to_string())?;
235 }
236
237 stream.flush().map_err(|e| e.to_string())
238 }
239
240 pub(crate) fn pipe_stream(
248 stream: &mut impl Write,
249 mut response: Response,
250 request: Request,
251 mut reader: Box<dyn std::io::Read + Send>,
252 ) -> Result<(), String> {
253 let backend_is_chunked = response.headers.iter().any(|h| {
257 h.name.eq_ignore_ascii_case("transfer-encoding")
258 && h.value.to_lowercase().contains("chunked")
259 });
260
261 response.headers.retain(|h| !h.name.eq_ignore_ascii_case("content-length"));
262 if !backend_is_chunked {
263 response.headers.push(Header {
264 name: Header::_TRANSFER_ENCODING.to_string(),
265 value: "chunked".to_string(),
266 });
267 }
268
269 let status = [
270 response.http_version.clone(),
271 response.status_code.to_string(),
272 response.reason_phrase.clone(),
273 ].join(SYMBOL.whitespace);
274
275 let mut head = format!("{}\r\n", status);
276 for header in &response.headers {
277 head.push_str(&header.name);
278 head.push_str(Header::NAME_VALUE_SEPARATOR);
279 head.push_str(&header.value);
280 head.push_str(SYMBOL.new_line_carriage_return);
281 }
282 head.push_str(SYMBOL.new_line_carriage_return);
283 stream.write_all(head.as_bytes()).map_err(|e| e.to_string())?;
284
285 if request.method != METHOD.head && request.method != METHOD.options {
286 let mut buf = [0u8; 8192];
287 if backend_is_chunked {
288 loop {
291 match reader.read(&mut buf) {
292 Ok(0) | Err(_) => break,
293 Ok(n) => {
294 stream.write_all(&buf[..n]).map_err(|e| e.to_string())?;
295 stream.flush().map_err(|e| e.to_string())?;
296 }
297 }
298 }
299 } else {
300 loop {
304 match reader.read(&mut buf) {
305 Ok(0) | Err(_) => break,
306 Ok(n) => {
307 stream
308 .write_all(format!("{:x}\r\n", n).as_bytes())
309 .map_err(|e| e.to_string())?;
310 stream.write_all(&buf[..n]).map_err(|e| e.to_string())?;
311 stream.write_all(b"\r\n").map_err(|e| e.to_string())?;
312 stream.flush().map_err(|e| e.to_string())?;
313 }
314 }
315 }
316 stream.write_all(b"0\r\n\r\n").map_err(|e| e.to_string())?;
317 }
318 }
319
320 stream.flush().map_err(|e| e.to_string())
321 }
322
323 pub fn setup() -> Result<(TcpListener, ThreadPool), String> {
326 let info = Log::info("Rust Web Server");
327 println!("{}", info);
328
329 let usage_info = Log::usage_information();
330 println!("{}", usage_info);
331
332
333 println!("RWS Configuration Start: \n");
334
335 set_default_values();
336 bootstrap();
337
338 println!("\nRWS Configuration End\n\n");
339
340
341 let (ip, port, thread_count) = get_ip_port_thread_count();
342
343
344 let mut ip_readable = ip.to_string();
345
346 if ip.contains(":") {
347 ip_readable = [SYMBOL.opening_square_bracket, &ip, SYMBOL.closing_square_bracket].join("");
348 }
349
350 let bind_addr = [ip_readable, SYMBOL.colon.to_string(), port.to_string()].join(SYMBOL.empty_string);
351
352 #[cfg(feature = "http2")]
353 let protocol = {
354 let cert = std::env::var(crate::entry_point::Config::RWS_CONFIG_TLS_CERT_FILE).unwrap_or_default();
355 if cert.is_empty() { "http" } else { "https" }
356 };
357 #[cfg(not(feature = "http2"))]
358 let protocol = "http";
359
360 println!("Setting up {}://{}...", protocol, &bind_addr);
361
362 let boxed_listener = TcpListener::bind(&bind_addr);
363 if boxed_listener.is_err() {
364 let message = format!("unable to set up TCP listener: {}", boxed_listener.err().unwrap());
365 return Err(message);
366 }
367
368 let listener = boxed_listener.unwrap();
369 let pool = ThreadPool::new(thread_count as usize);
370
371
372 let server_url_thread_count = Log::server_url_thread_count(protocol, &bind_addr, thread_count);
373 println!("{}", server_url_thread_count);
374
375 Ok((listener, pool))
376 }
377
378 pub fn run(listener: TcpListener,
386 pool: ThreadPool,
387 app: impl Application + Send + 'static + Clone) {
388 #[cfg(feature = "http1")]
389 {
390 use std::sync::Arc;
391 use std::sync::atomic::{AtomicBool, Ordering};
392
393 let shutdown = Arc::new(AtomicBool::new(false));
394 let s = shutdown.clone();
395 if let Err(e) = ctrlc::set_handler(move || {
396 s.store(true, Ordering::SeqCst);
397 }) {
398 eprintln!("unable to install signal handler: {}", e);
399 }
400 crate::config_reload::install_sighup_handler();
401 if let Err(e) = listener.set_nonblocking(true) {
402 eprintln!("unable to set non-blocking listener: {}", e);
403 }
404
405 loop {
406 if shutdown.load(Ordering::SeqCst) {
407 break;
408 }
409 if crate::config_reload::RELOAD_REQUESTED
410 .compare_exchange(true, false, Ordering::SeqCst, Ordering::Relaxed)
411 .is_ok()
412 {
413 crate::config_reload::reload();
414 }
415 match listener.accept() {
416 Ok((stream, peer_addr)) => {
417 Server::dispatch_connection(stream, peer_addr, &pool, app.clone());
418 }
419 Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => {
420 std::thread::sleep(Duration::from_millis(10));
421 }
422 Err(e) => {
423 eprintln!("accept error: {}", e);
424 break;
425 }
426 }
427 }
428
429 crate::metrics::SERVER_READY.store(false, std::sync::atomic::Ordering::SeqCst);
430 println!("Shutting down — waiting for in-flight connections to finish");
431 pool.join();
432 println!("Server stopped");
433 }
434
435 #[cfg(not(feature = "http1"))]
436 {
437 for boxed_stream in listener.incoming() {
438 match boxed_stream {
439 Err(e) => {
440 eprintln!("unable to get TCP stream: {}", e);
441 return;
442 }
443 Ok(stream) => {
444 let peer_addr = match stream.peer_addr() {
445 Ok(a) => a,
446 Err(e) => {
447 eprintln!("unable to read peer addr: {}", e);
448 return;
449 }
450 };
451 Server::dispatch_connection(stream, peer_addr, &pool, app.clone());
452 }
453 }
454 }
455 }
456 }
457
458 fn dispatch_connection(
459 stream: std::net::TcpStream,
460 peer_addr: std::net::SocketAddr,
461 pool: &ThreadPool,
462 app: impl Application + Send + 'static + Clone,
463 ) {
464 print!("Connection established, ");
465 if let Ok(local) = stream.local_addr() {
466 print!("local addr: {}", local);
467 }
468 println!(", peer addr: {}", peer_addr);
469
470 let (server_ip, server_port, _thread_count) = get_ip_port_thread_count();
471 let connection = ConnectionInfo {
472 client: Address {
473 ip: peer_addr.ip().to_string(),
474 port: peer_addr.port() as i32,
475 },
476 server: Address {
477 ip: server_ip,
478 port: server_port,
479 },
480 request_size: get_request_allocation_size(),
481 sni_hostname: None,
482 };
483
484 if let Err(e) = stream.set_read_timeout(Some(Duration::from_secs(30))) {
485 eprintln!("failed to set read timeout: {}", e);
486 }
487
488 pool.execute(move || {
489 crate::metrics::connection_open();
490 let result = Server::process(stream, connection, app);
491 crate::metrics::connection_close();
492 if let Err(msg) = result {
493 crate::metrics::record_error();
494 eprintln!("{}", msg);
495 }
496 });
497 }
498
499}
500
501#[derive(Clone)]
503pub struct ConnectionInfo {
504 pub client: Address,
506 pub server: Address,
508 pub request_size: i64,
510 pub sni_hostname: Option<String>,
513}
514
515#[derive(Clone)]
517pub struct Address {
518 pub ip: String,
519 pub port: i32
520}
521
522impl ConnectionInfo {
523 pub fn peer_addr(&self) -> Option<std::net::SocketAddr> {
526 self.client.to_socket_addr()
527 }
528}
529
530impl Address {
531 pub fn to_socket_addr(&self) -> Option<std::net::SocketAddr> {
534 let ip: std::net::IpAddr = self.ip.parse().ok()?;
535 let port = u16::try_from(self.port).ok()?;
536 Some(std::net::SocketAddr::new(ip, port))
537 }
538}
539
540#[cfg(feature = "http2")]
543async fn sigterm() {
544 #[cfg(unix)]
545 {
546 if let Ok(mut s) = tokio::signal::unix::signal(
547 tokio::signal::unix::SignalKind::terminate()
548 ) {
549 s.recv().await;
550 } else {
551 std::future::pending::<()>().await
552 }
553 }
554 #[cfg(not(unix))]
555 std::future::pending::<()>().await
556}
557
558#[cfg(feature = "http2")]
560async fn sighup() {
561 #[cfg(unix)]
562 {
563 if let Ok(mut s) = tokio::signal::unix::signal(
564 tokio::signal::unix::SignalKind::hangup()
565 ) {
566 s.recv().await;
567 } else {
568 std::future::pending::<()>().await
569 }
570 }
571 #[cfg(not(unix))]
572 std::future::pending::<()>().await
573}
574
575#[cfg(feature = "http2")]
576impl Server {
577 pub async fn run_tls(
578 listener: TcpListener,
579 pool: ThreadPool,
580 app: impl Application + Send + 'static + Clone,
581 ) {
582 use crate::tls::create_tls_acceptor_from_vhosts;
583 use crate::h2_handler;
584
585 let cert_path = std::env::var(crate::entry_point::Config::RWS_CONFIG_TLS_CERT_FILE)
586 .unwrap_or_default();
587 let key_path = std::env::var(crate::entry_point::Config::RWS_CONFIG_TLS_KEY_FILE)
588 .unwrap_or_default();
589
590 if cert_path.is_empty() || key_path.is_empty() {
591 println!("No TLS certificate configured — serving plain HTTP/1.1.");
592 tokio::task::block_in_place(|| Server::run(listener, pool, app));
593 return;
594 }
595
596 let vhosts = crate::entry_point::get_virtual_hosts();
597 let mut tls_acceptor = match create_tls_acceptor_from_vhosts(&vhosts, &cert_path, &key_path) {
598 Ok(a) => a,
599 Err(e) => {
600 eprintln!("TLS setup failed: {}", e);
601 return;
602 }
603 };
604
605 listener
606 .set_nonblocking(true)
607 .expect("failed to set TCP listener to non-blocking");
608 let tokio_listener = tokio::net::TcpListener::from_std(listener)
609 .expect("failed to convert TCP listener to tokio");
610
611 println!("Listening for TLS connections (HTTP/1.1 + HTTP/2)...");
612
613 loop {
614 tokio::select! {
615 result = tokio_listener.accept() => {
616 match result {
617 Ok((tcp_stream, peer_addr)) => {
618 let acceptor = tls_acceptor.clone();
619 let app = app.clone();
620 tokio::spawn(async move {
621 match acceptor.accept(tcp_stream).await {
622 Ok(tls_stream) => {
623 let server_conn = tls_stream.get_ref().1;
624 let sni = server_conn.server_name().map(|s| s.to_string());
625 let protocol = server_conn
626 .alpn_protocol()
627 .map(|p| p.to_vec());
628
629 match protocol.as_deref() {
630 Some(b"h2") => {
631 if let Err(e) =
632 h2_handler::handle_connection(tls_stream, peer_addr, sni, app)
633 .await
634 {
635 eprintln!("H2 connection error: {}", e);
636 }
637 }
638 _ => {
639 if let Err(e) =
640 Server::process_h1_tls(tls_stream, peer_addr, sni, app).await
641 {
642 eprintln!("H1 TLS error: {}", e);
643 }
644 }
645 }
646 }
647 Err(e) => eprintln!("TLS handshake failed: {}", e),
648 }
649 });
650 }
651 Err(e) => eprintln!("TCP accept error: {}", e),
652 }
653 }
654 _ = tokio::signal::ctrl_c() => {
655 crate::metrics::SERVER_READY.store(false, std::sync::atomic::Ordering::SeqCst);
656 println!("\nShutting down gracefully (SIGINT).");
657 break;
658 }
659 _ = sigterm() => {
660 crate::metrics::SERVER_READY.store(false, std::sync::atomic::Ordering::SeqCst);
661 println!("\nShutting down gracefully (SIGTERM).");
662 break;
663 }
664 _ = sighup() => {
665 crate::config_reload::reload();
666 let vhosts = crate::entry_point::get_virtual_hosts();
667 if let Ok(new_acceptor) = create_tls_acceptor_from_vhosts(&vhosts, &cert_path, &key_path) {
668 tls_acceptor = new_acceptor;
669 println!("[TLS] Certificates reloaded ({} virtual hosts).", vhosts.len());
670 }
671 }
672 }
673 }
674 }
675
676 pub async fn run_redirect() {
680 use std::env;
681 use tokio::io::{AsyncReadExt, AsyncWriteExt};
682 use tokio::net::TcpListener as TokioListener;
683
684 let cert_path = env::var(crate::entry_point::Config::RWS_CONFIG_TLS_CERT_FILE)
685 .unwrap_or_default();
686 if cert_path.is_empty() {
687 return;
688 }
689
690 let redirect_port_str = env::var(crate::entry_point::Config::RWS_CONFIG_HTTP_REDIRECT_PORT)
691 .unwrap_or_default();
692 if redirect_port_str.is_empty() {
693 return;
694 }
695
696 let redirect_port: u16 = match redirect_port_str.parse() {
697 Ok(p) => p,
698 Err(_) => {
699 eprintln!("Invalid RWS_CONFIG_HTTP_REDIRECT_PORT: {}", redirect_port_str);
700 return;
701 }
702 };
703
704 let (server_ip, server_port, _) = get_ip_port_thread_count();
705 let bind_addr = format!("{}:{}", server_ip, redirect_port);
706
707 let listener = match TokioListener::bind(&bind_addr).await {
708 Ok(l) => l,
709 Err(e) => {
710 eprintln!("HTTP redirect listener error on {}: {}", bind_addr, e);
711 return;
712 }
713 };
714
715 println!("HTTP→HTTPS redirect listening on http://{}:{}", server_ip, redirect_port);
716
717 loop {
718 tokio::select! {
719 result = listener.accept() => {
720 match result {
721 Ok((mut stream, _peer)) => {
722 let https_port = server_port;
723 tokio::spawn(async move {
724 let mut buf = vec![0u8; 4096];
725 let n = match stream.read(&mut buf).await {
726 Ok(n) => n,
727 Err(_) => return,
728 };
729 let text = String::from_utf8_lossy(&buf[..n]);
730
731 let uri = text.lines()
732 .next()
733 .and_then(|line| line.split_whitespace().nth(1))
734 .unwrap_or("/")
735 .to_string();
736
737 let host_header = text.lines()
738 .find(|l| l.to_lowercase().starts_with("host:"))
739 .map(|l| l[5..].trim().to_string());
740
741 let location = match host_header {
742 Some(h) => {
743 let h_no_port = if h.starts_with('[') {
745 h.find(']')
747 .map(|i| h[..=i].to_string())
748 .unwrap_or(h.clone())
749 } else {
750 h.rfind(':')
751 .map(|i| h[..i].to_string())
752 .unwrap_or(h.clone())
753 };
754 if https_port == 443 {
755 format!("https://{}{}", h_no_port, uri)
756 } else {
757 format!("https://{}:{}{}", h_no_port, https_port, uri)
758 }
759 }
760 None => format!("https://localhost:{}{}", https_port, uri),
761 };
762
763 let response = format!(
764 "HTTP/1.1 301 Moved Permanently\r\nLocation: {}\r\nContent-Length: 0\r\nConnection: close\r\n\r\n",
765 location
766 );
767 let _ = stream.write_all(response.as_bytes()).await;
768 });
769 }
770 Err(e) => eprintln!("HTTP redirect accept error: {}", e),
771 }
772 }
773 _ = tokio::signal::ctrl_c() => {
774 println!("\nShutting down HTTP redirect listener (SIGINT).");
775 break;
776 }
777 _ = sigterm() => {
778 println!("\nShutting down HTTP redirect listener (SIGTERM).");
779 break;
780 }
781 _ = sighup() => {
782 crate::config_reload::reload();
783 }
784 }
785 }
786 }
787
788 async fn process_h1_tls(
789 mut stream: tokio_rustls::server::TlsStream<tokio::net::TcpStream>,
790 peer_addr: std::net::SocketAddr,
791 sni_hostname: Option<String>,
792 app: impl Application,
793 ) -> Result<(), String> {
794 use tokio::io::{AsyncReadExt, AsyncWriteExt};
795
796 let (server_ip, server_port, _) = get_ip_port_thread_count();
797 let request_allocation_size = get_request_allocation_size();
798
799 let mut buffer = vec![0u8; request_allocation_size as usize];
800 if let Err(e) = stream.read(&mut buffer).await {
801 let raw = Server::bad_request_response(e.to_string());
802 let _ = stream.write_all(&raw).await;
803 return Ok(());
804 }
805
806 let request = match Request::parse(&buffer) {
807 Ok(r) => r,
808 Err(message) => {
809 let raw = Server::bad_request_response(message);
810 let _ = stream.write_all(&raw).await;
811 return Ok(());
812 }
813 };
814
815 let connection = ConnectionInfo {
816 client: Address {
817 ip: peer_addr.ip().to_string(),
818 port: peer_addr.port() as i32,
819 },
820 server: Address {
821 ip: server_ip,
822 port: server_port,
823 },
824 request_size: request_allocation_size,
825 sni_hostname,
826 };
827
828 let mut response = match app.execute(&request, &connection) {
829 Ok(r) => r,
830 Err(message) => {
831 let raw = Server::bad_request_response(message);
832 let _ = stream.write_all(&raw).await;
833 return Ok(());
834 }
835 };
836
837 crate::metrics::record_request();
838 crate::compression::apply_gzip(&request, &mut response);
839 response.headers.push(Header::get_hsts_header());
840
841 #[cfg(feature = "http3")]
842 response.headers.push(Header {
843 name: Header::_ALT_SVC.to_string(),
844 value: format!("h3=\":{}\"", server_port),
845 });
846 #[cfg(not(feature = "http3"))]
847 response.headers.push(Header {
848 name: Header::_ALT_SVC.to_string(),
849 value: format!("h2=\":{}\"", server_port),
850 });
851
852 Log::log_access(&request, &response, &peer_addr);
853
854 let raw = Response::generate_response(response, request);
855 stream
856 .write_all(&raw)
857 .await
858 .map_err(|e| e.to_string())?;
859 stream.flush().await.map_err(|e| e.to_string())?;
860
861 Ok(())
862 }
863}
864
865#[cfg(feature = "http3")]
866impl Server {
867 pub async fn run_quic(
868 app: impl Application + Send + 'static + Clone,
869 ) {
870 use crate::tls::create_quinn_server_config_from_vhosts;
871 use crate::h3_handler;
872
873 let cert_path = std::env::var(crate::entry_point::Config::RWS_CONFIG_TLS_CERT_FILE)
874 .unwrap_or_default();
875 let key_path = std::env::var(crate::entry_point::Config::RWS_CONFIG_TLS_KEY_FILE)
876 .unwrap_or_default();
877
878 if cert_path.is_empty() || key_path.is_empty() {
879 return;
880 }
881
882 let vhosts = crate::entry_point::get_virtual_hosts();
883 let server_config = match create_quinn_server_config_from_vhosts(&vhosts, &cert_path, &key_path) {
884 Ok(c) => c,
885 Err(e) => {
886 eprintln!("QUIC TLS setup failed: {}", e);
887 return;
888 }
889 };
890
891 let (server_ip, server_port, _) = get_ip_port_thread_count();
892 let bind_addr = format!("{}:{}", server_ip, server_port);
893 let addr: std::net::SocketAddr = match bind_addr.parse() {
894 Ok(a) => a,
895 Err(e) => {
896 eprintln!("Invalid QUIC bind address '{}': {}", bind_addr, e);
897 return;
898 }
899 };
900
901 let endpoint = match quinn::Endpoint::server(server_config, addr) {
902 Ok(e) => e,
903 Err(e) => {
904 eprintln!("QUIC endpoint error: {}", e);
905 return;
906 }
907 };
908
909 println!("Listening for QUIC/HTTP3 on UDP {}:{}", server_ip, server_port);
910
911 loop {
912 tokio::select! {
913 maybe = endpoint.accept() => {
914 match maybe {
915 Some(incoming) => {
916 let app = app.clone();
917 tokio::spawn(async move {
918 match incoming.await {
919 Ok(conn) => {
920 let peer_addr = conn.remote_address();
921 if let Err(e) = h3_handler::handle_connection(conn, peer_addr, app).await {
922 eprintln!("H3 connection error: {}", e);
923 }
924 }
925 Err(e) => eprintln!("QUIC connection error: {}", e),
926 }
927 });
928 }
929 None => break,
930 }
931 }
932 _ = tokio::signal::ctrl_c() => {
933 crate::metrics::SERVER_READY.store(false, std::sync::atomic::Ordering::SeqCst);
934 println!("\nShutting down QUIC (SIGINT).");
935 endpoint.close(0u32.into(), b"shutdown");
936 break;
937 }
938 _ = sigterm() => {
939 crate::metrics::SERVER_READY.store(false, std::sync::atomic::Ordering::SeqCst);
940 println!("\nShutting down QUIC (SIGTERM).");
941 endpoint.close(0u32.into(), b"shutdown");
942 break;
943 }
944 _ = sighup() => {
945 crate::config_reload::reload();
946 }
947 }
948 }
949 }
950}
951
952