Skip to main content

rust_web_server/server/
mod.rs

1#[cfg(test)]
2pub mod tests;
3#[cfg(test)]
4mod example;
5
6use std::io::prelude::*;
7use std::borrow::Borrow;
8use std::net::{IpAddr, SocketAddr, TcpListener};
9use std::str::FromStr;
10use std::time::Duration;
11
12use crate::request::{METHOD, Request};
13use crate::response::{Response, STATUS_CODE_REASON_PHRASE};
14use crate::app::App;
15use crate::application::Application;
16use crate::entry_point::{bootstrap, get_ip_port_thread_count, get_request_allocation_size, set_default_values};
17use crate::header::Header;
18use crate::log::Log;
19use crate::mime_type::MimeType;
20use crate::range::{ContentRange, Range};
21use crate::symbol::SYMBOL;
22use crate::thread_pool::ThreadPool;
23
24pub struct Server {}
25impl Server {
26    pub fn process_request(mut stream: impl Read + Write + Unpin, peer_addr: SocketAddr) -> Vec<u8> {
27        let request_allocation_size = get_request_allocation_size();
28        let mut buffer = vec![0; request_allocation_size as usize];
29        let boxed_read = stream.read(&mut buffer);
30        if boxed_read.is_err() {
31            let message = boxed_read.err().unwrap().to_string();
32            eprintln!("unable to read TCP stream {}", &message);
33
34            let raw_response = Server::bad_request_response(message);
35            let boxed_stream = stream.write(raw_response.borrow());
36            if boxed_stream.is_ok() {
37                stream.flush().unwrap();
38            };
39            return raw_response;
40        }
41
42        boxed_read.unwrap();
43        let request : &[u8] = &buffer;
44
45        // let raw_request = String::from_utf8(Vec::from(request)).unwrap();
46        // println!("\n\n______{}______\n\n", raw_request);
47
48
49        let boxed_request = Request::parse_request(request);
50        if boxed_request.is_err() {
51            let message = boxed_request.err().unwrap();
52            eprintln!("unable to parse request: {}", &message);
53
54            let raw_response = Server::bad_request_response(message);
55            let boxed_stream = stream.write(raw_response.borrow());
56            if boxed_stream.is_ok() {
57                stream.flush().unwrap();
58            };
59            return raw_response;
60        }
61
62
63        let request: Request = boxed_request.unwrap();
64        let (response, request) = App::handle_request(request);
65
66
67        let log_request_response = Log::combined(&request, &response, &peer_addr);
68        println!("{}", log_request_response);
69        let raw_response = Response::generate_response(response, request);
70
71        let boxed_stream = stream.write(raw_response.borrow());
72        if boxed_stream.is_ok() {
73            stream.flush().unwrap();
74        };
75
76        raw_response
77    }
78
79    pub fn bad_request_response(message: String) -> Vec<u8> {
80        let error_request = Request {
81            method: METHOD.get.to_string(),
82            request_uri: "".to_string(),
83            http_version: "".to_string(),
84            headers: vec![],
85            body: vec![],
86        };
87
88        let size = message.chars().count() as u64;
89        let content_range = ContentRange {
90            unit: Range::BYTES.to_string(),
91            range: Range { start: 0, end: size },
92            size: size.to_string(),
93            body: Vec::from(message.as_bytes()),
94            content_type: MimeType::TEXT_PLAIN.to_string(),
95        };
96
97        let header_list = Header::get_header_list(&error_request);
98        let error_response: Response = Response::get_response(
99            STATUS_CODE_REASON_PHRASE.n400_bad_request,
100            Some(header_list),
101            Some(vec![content_range])
102        );
103
104        let response = Response::generate_response(error_response, error_request);
105        return response;
106    }
107
108    /// Builds a `413 Payload Too Large` raw response for a request whose declared
109    /// `Content-Length` exceeds `RWS_CONFIG_MAX_BODY_SIZE_IN_BYTES`. The connection
110    /// is always closed after this is sent (see [`Server::process`]) — the client's
111    /// unread body bytes are still queued on the socket, so keep-alive framing
112    /// can't be trusted.
113    pub fn payload_too_large_response(message: String) -> Vec<u8> {
114        let error_request = Request {
115            method: METHOD.get.to_string(),
116            request_uri: "".to_string(),
117            http_version: "".to_string(),
118            headers: vec![],
119            body: vec![],
120        };
121
122        let size = message.chars().count() as u64;
123        let content_range = ContentRange {
124            unit: Range::BYTES.to_string(),
125            range: Range { start: 0, end: size },
126            size: size.to_string(),
127            body: Vec::from(message.as_bytes()),
128            content_type: MimeType::TEXT_PLAIN.to_string(),
129        };
130
131        let mut header_list = Header::get_header_list(&error_request);
132        header_list.push(Header { name: Header::_CONNECTION.to_string(), value: "close".to_string() });
133        let error_response: Response = Response::get_response(
134            STATUS_CODE_REASON_PHRASE.n413_payload_too_large,
135            Some(header_list),
136            Some(vec![content_range])
137        );
138
139        Response::generate_response(error_response, error_request)
140    }
141
142    pub fn process(mut stream: impl Read + Write + Unpin,
143                   connection: ConnectionInfo,
144                   app: impl Application) -> Result<(), String> {
145        use crate::http::VERSION;
146
147        let request_allocation_size = connection.request_size;
148        let client = connection.client.clone();
149        let client_addr = SocketAddr::new(IpAddr::from_str(client.ip.as_str()).unwrap(), client.port as u16);
150
151        let max_body_size = crate::entry_point::get_max_body_size();
152
153        loop {
154            let mut buffer = vec![0; request_allocation_size as usize];
155            let boxed_read = stream.read(&mut buffer);
156            let n = match boxed_read {
157                // timeout or client closed — normal end of keep-alive session
158                Err(_) => break,
159                Ok(0) => break,
160                Ok(n) => n,
161            };
162
163            // Trim to the bytes actually read — passing the zero-padded rest of
164            // `buffer` to Request::parse would append trailing NUL bytes to the body.
165            let mut request = match Request::parse(&buffer[..n]) {
166                Ok(r) => r,
167                Err(message) => {
168                    let raw_response = Server::bad_request_response(message.clone());
169                    let boxed_stream = stream.write(raw_response.borrow());
170                    if boxed_stream.is_ok() { stream.flush().unwrap(); }
171                    return Err(message);
172                }
173            };
174
175            // If Content-Length declares more body than this one read delivered,
176            // keep reading until it's fully received — a single `request_allocation_size`
177            // read is not big enough for bodies larger than that buffer.
178            if let Some(declared_len) = request
179                .get_header(Header::_CONTENT_LENGTH.to_string())
180                .and_then(|h| h.value.trim().parse::<u64>().ok())
181            {
182                if max_body_size > 0 && declared_len > max_body_size {
183                    let raw_response = Server::payload_too_large_response(format!(
184                        "413 Payload Too Large: declared Content-Length {} exceeds the {} byte limit",
185                        declared_len, max_body_size
186                    ));
187                    let boxed_stream = stream.write(raw_response.borrow());
188                    if boxed_stream.is_ok() { stream.flush().unwrap(); }
189                    break;
190                }
191
192                while (request.body.len() as u64) < declared_len {
193                    let mut more = vec![0u8; request_allocation_size as usize];
194                    match stream.read(&mut more) {
195                        Ok(0) | Err(_) => break, // peer closed early; proceed with the short body
196                        Ok(k) => request.body.extend_from_slice(&more[..k]),
197                    }
198                }
199            }
200
201            let keep_alive = {
202                let conn_hdr = request.get_header(Header::_CONNECTION.to_string());
203                match conn_hdr {
204                    Some(h) => h.value.to_lowercase() != "close",
205                    None => request.http_version == VERSION.http_1_1,
206                }
207            };
208
209            let mut response = match app.execute(&request, &connection) {
210                Ok(r) => r,
211                Err(message) => {
212                    let raw_response = Server::bad_request_response(message.clone());
213                    let boxed_stream = stream.write(raw_response.borrow());
214                    if boxed_stream.is_ok() { stream.flush().unwrap(); }
215                    return Err(message);
216                }
217            };
218
219            crate::metrics::record_request();
220            crate::compression::apply_gzip(&request, &mut response);
221
222            response.headers.push(Header {
223                name: Header::_CONNECTION.to_string(),
224                value: if keep_alive { "keep-alive".to_string() } else { "close".to_string() },
225            });
226
227            Log::log_access(&request, &response, &client_addr);
228
229            if let Some(reader) = response.stream_pipe.take() {
230                if let Err(e) = Server::pipe_stream(&mut stream, response, request, reader) {
231                    return Err(e);
232                }
233            } else if let Some(ref filepath) = response.stream_file.clone() {
234                if let Err(e) = Server::write_chunked_file(&mut stream, response, request, filepath) {
235                    return Err(e);
236                }
237            } else {
238                let raw_response = Response::generate_response(response, request);
239                if let Err(e) = stream.write(raw_response.borrow()) {
240                    return Err(e.to_string());
241                }
242                stream.flush().unwrap();
243            }
244
245            if !keep_alive { break; }
246        }
247
248        Ok(())
249    }
250
251    /// Streams a file to `stream` using HTTP/1.1 chunked transfer encoding.
252    /// The response headers are written first, then the file is read and written in 64 KB chunks.
253    pub(crate) fn write_chunked_file(
254        stream: &mut impl Write,
255        mut response: Response,
256        request: Request,
257        filepath: &str,
258    ) -> Result<(), String> {
259        use std::fs::File;
260        use std::io::Read as _;
261
262        response.headers.push(Header {
263            name: Header::_TRANSFER_ENCODING.to_string(),
264            value: "chunked".to_string(),
265        });
266
267        // build status line + headers (no body)
268        let status = [
269            response.http_version.clone(),
270            response.status_code.to_string(),
271            response.reason_phrase.clone(),
272        ].join(SYMBOL.whitespace);
273
274        let mut headers_str = SYMBOL.new_line_carriage_return.to_string();
275        for header in &response.headers {
276            headers_str.push_str(&header.name);
277            headers_str.push_str(Header::NAME_VALUE_SEPARATOR);
278            headers_str.push_str(&header.value);
279            headers_str.push_str(SYMBOL.new_line_carriage_return);
280        }
281        let head = format!("{}{}{}", status, headers_str, SYMBOL.new_line_carriage_return);
282
283        stream.write_all(head.as_bytes()).map_err(|e| e.to_string())?;
284
285        if request.method != METHOD.head && request.method != METHOD.options {
286            let mut file = File::open(filepath).map_err(|e| e.to_string())?;
287            let mut buf = vec![0u8; 65536];
288            loop {
289                let n = file.read(&mut buf).map_err(|e| e.to_string())?;
290                if n == 0 { break; }
291                // chunk header: hex size + CRLF
292                stream.write_all(format!("{:x}\r\n", n).as_bytes()).map_err(|e| e.to_string())?;
293                stream.write_all(&buf[..n]).map_err(|e| e.to_string())?;
294                stream.write_all(b"\r\n").map_err(|e| e.to_string())?;
295            }
296            // terminal chunk
297            stream.write_all(b"0\r\n\r\n").map_err(|e| e.to_string())?;
298        }
299
300        stream.flush().map_err(|e| e.to_string())
301    }
302
303    /// Streams a `Read` source to `stream` using HTTP/1.1 chunked transfer encoding.
304    ///
305    /// Called when `response.stream_pipe` is set (proxy passthrough for SSE, AI token
306    /// streams, and large downloads). The response headers are written first (with
307    /// `Transfer-Encoding: chunked` injected and any `Content-Length` stripped), then
308    /// bytes are read from `reader` and forwarded in 8 KB chunks. Each chunk is flushed
309    /// immediately so the client sees data as it arrives.
310    pub(crate) fn pipe_stream(
311        stream: &mut impl Write,
312        mut response: Response,
313        request: Request,
314        mut reader: Box<dyn std::io::Read + Send>,
315    ) -> Result<(), String> {
316        // If the backend already uses chunked encoding we pass the raw chunk
317        // frames through; otherwise we wrap the raw bytes in our own chunked
318        // framing so the client can receive data incrementally.
319        let backend_is_chunked = response.headers.iter().any(|h| {
320            h.name.eq_ignore_ascii_case("transfer-encoding")
321                && h.value.to_lowercase().contains("chunked")
322        });
323
324        response.headers.retain(|h| !h.name.eq_ignore_ascii_case("content-length"));
325        if !backend_is_chunked {
326            response.headers.push(Header {
327                name: Header::_TRANSFER_ENCODING.to_string(),
328                value: "chunked".to_string(),
329            });
330        }
331
332        let status = [
333            response.http_version.clone(),
334            response.status_code.to_string(),
335            response.reason_phrase.clone(),
336        ].join(SYMBOL.whitespace);
337
338        let mut head = format!("{}\r\n", status);
339        for header in &response.headers {
340            head.push_str(&header.name);
341            head.push_str(Header::NAME_VALUE_SEPARATOR);
342            head.push_str(&header.value);
343            head.push_str(SYMBOL.new_line_carriage_return);
344        }
345        head.push_str(SYMBOL.new_line_carriage_return);
346        stream.write_all(head.as_bytes()).map_err(|e| e.to_string())?;
347
348        if request.method != METHOD.head && request.method != METHOD.options {
349            let mut buf = [0u8; 8192];
350            if backend_is_chunked {
351                // Passthrough: forward raw chunk frames as-is; the client
352                // decodes them itself (avoids double-chunking).
353                loop {
354                    match reader.read(&mut buf) {
355                        Ok(0) | Err(_) => break,
356                        Ok(n) => {
357                            stream.write_all(&buf[..n]).map_err(|e| e.to_string())?;
358                            stream.flush().map_err(|e| e.to_string())?;
359                        }
360                    }
361                }
362            } else {
363                // Re-encode: wrap the raw byte stream in chunked framing so
364                // the client can receive each fragment as it arrives (SSE,
365                // large downloads).
366                loop {
367                    match reader.read(&mut buf) {
368                        Ok(0) | Err(_) => break,
369                        Ok(n) => {
370                            stream
371                                .write_all(format!("{:x}\r\n", n).as_bytes())
372                                .map_err(|e| e.to_string())?;
373                            stream.write_all(&buf[..n]).map_err(|e| e.to_string())?;
374                            stream.write_all(b"\r\n").map_err(|e| e.to_string())?;
375                            stream.flush().map_err(|e| e.to_string())?;
376                        }
377                    }
378                }
379                stream.write_all(b"0\r\n\r\n").map_err(|e| e.to_string())?;
380            }
381        }
382
383        stream.flush().map_err(|e| e.to_string())
384    }
385
386    /// Reads configuration (IP, port, thread count, TLS paths) from the layered config system
387    /// and returns a bound `TcpListener` and a sized `ThreadPool`. Call once at startup.
388    pub fn setup() -> Result<(TcpListener, ThreadPool), String> {
389        let info = Log::info("Rust Web Server");
390        println!("{}", info);
391
392        let usage_info = Log::usage_information();
393        println!("{}", usage_info);
394
395
396        println!("RWS Configuration Start: \n");
397
398        set_default_values();
399        bootstrap();
400
401        println!("\nRWS Configuration End\n\n");
402
403
404        let (ip, port, thread_count) = get_ip_port_thread_count();
405
406
407        let mut ip_readable = ip.to_string();
408
409        if ip.contains(":") {
410            ip_readable = [SYMBOL.opening_square_bracket, &ip, SYMBOL.closing_square_bracket].join("");
411        }
412
413        let bind_addr = [ip_readable, SYMBOL.colon.to_string(), port.to_string()].join(SYMBOL.empty_string);
414
415        #[cfg(feature = "http2")]
416        let protocol = {
417            let cert = std::env::var(crate::entry_point::Config::RWS_CONFIG_TLS_CERT_FILE).unwrap_or_default();
418            if cert.is_empty() { "http" } else { "https" }
419        };
420        #[cfg(not(feature = "http2"))]
421        let protocol = "http";
422
423        println!("Setting up {}://{}...", protocol, &bind_addr);
424
425        let boxed_listener = TcpListener::bind(&bind_addr);
426        if boxed_listener.is_err() {
427            let message = format!("unable to set up TCP listener: {}", boxed_listener.err().unwrap());
428            return Err(message);
429        }
430
431        let listener = boxed_listener.unwrap();
432        let pool = ThreadPool::new(thread_count as usize);
433
434
435        let server_url_thread_count = Log::server_url_thread_count(protocol, &bind_addr, thread_count);
436        println!("{}", server_url_thread_count);
437
438        Ok((listener, pool))
439    }
440
441    /// Accepts TCP connections in a loop and dispatches each to the thread pool.
442    ///
443    /// When built with the `http1` feature, Ctrl+C and SIGTERM stop the accept
444    /// loop gracefully: `SERVER_READY` is cleared and the pool drains all
445    /// in-flight connections before returning.
446    ///
447    /// For TLS/HTTP2/HTTP3 use [`Server::run_tls`].
448    pub fn run(listener: TcpListener,
449               pool: ThreadPool,
450               app: impl Application + Send + 'static + Clone) {
451        #[cfg(feature = "http1")]
452        {
453            use std::sync::Arc;
454            use std::sync::atomic::{AtomicBool, Ordering};
455
456            let shutdown = Arc::new(AtomicBool::new(false));
457            let s = shutdown.clone();
458            if let Err(e) = ctrlc::set_handler(move || {
459                s.store(true, Ordering::SeqCst);
460            }) {
461                eprintln!("unable to install signal handler: {}", e);
462            }
463            crate::config_reload::install_sighup_handler();
464            if let Err(e) = listener.set_nonblocking(true) {
465                eprintln!("unable to set non-blocking listener: {}", e);
466            }
467
468            loop {
469                if shutdown.load(Ordering::SeqCst) {
470                    break;
471                }
472                if crate::config_reload::RELOAD_REQUESTED
473                    .compare_exchange(true, false, Ordering::SeqCst, Ordering::Relaxed)
474                    .is_ok()
475                {
476                    crate::config_reload::reload();
477                }
478                match listener.accept() {
479                    Ok((stream, peer_addr)) => {
480                        Server::dispatch_connection(stream, peer_addr, &pool, app.clone());
481                    }
482                    Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => {
483                        std::thread::sleep(Duration::from_millis(10));
484                    }
485                    Err(e) => {
486                        eprintln!("accept error: {}", e);
487                        break;
488                    }
489                }
490            }
491
492            crate::metrics::SERVER_READY.store(false, std::sync::atomic::Ordering::SeqCst);
493            println!("Shutting down — waiting for in-flight connections to finish");
494            pool.join();
495            println!("Server stopped");
496        }
497
498        #[cfg(not(feature = "http1"))]
499        {
500            for boxed_stream in listener.incoming() {
501                match boxed_stream {
502                    Err(e) => {
503                        eprintln!("unable to get TCP stream: {}", e);
504                        return;
505                    }
506                    Ok(stream) => {
507                        let peer_addr = match stream.peer_addr() {
508                            Ok(a) => a,
509                            Err(e) => {
510                                eprintln!("unable to read peer addr: {}", e);
511                                return;
512                            }
513                        };
514                        Server::dispatch_connection(stream, peer_addr, &pool, app.clone());
515                    }
516                }
517            }
518        }
519    }
520
521    fn dispatch_connection(
522        stream: std::net::TcpStream,
523        peer_addr: std::net::SocketAddr,
524        pool: &ThreadPool,
525        app: impl Application + Send + 'static + Clone,
526    ) {
527        print!("Connection established, ");
528        if let Ok(local) = stream.local_addr() {
529            print!("local addr: {}", local);
530        }
531        println!(", peer addr: {}", peer_addr);
532
533        let (server_ip, server_port, _thread_count) = get_ip_port_thread_count();
534        let connection = ConnectionInfo {
535            client: Address {
536                ip: peer_addr.ip().to_string(),
537                port: peer_addr.port() as i32,
538            },
539            server: Address {
540                ip: server_ip,
541                port: server_port,
542            },
543            request_size: get_request_allocation_size(),
544            sni_hostname: None,
545        };
546
547        if let Err(e) = stream.set_read_timeout(Some(Duration::from_secs(30))) {
548            eprintln!("failed to set read timeout: {}", e);
549        }
550
551        pool.execute(move || {
552            crate::metrics::connection_open();
553            let result = Server::process(stream, connection, app);
554            crate::metrics::connection_close();
555            if let Err(msg) = result {
556                crate::metrics::record_error();
557                eprintln!("{}", msg);
558            }
559        });
560    }
561
562}
563
564/// Network context for the current connection, passed into every [`Controller`](crate::controller::Controller).
565#[derive(Clone)]
566pub struct ConnectionInfo {
567    /// Client (peer) address.
568    pub client: Address,
569    /// Server (local) address.
570    pub server: Address,
571    /// Bytes allocated for reading the request.
572    pub request_size: i64,
573    /// SNI hostname sent by the client during the TLS handshake, if any.
574    /// `None` for plain-HTTP connections or when the client omits SNI.
575    pub sni_hostname: Option<String>,
576}
577
578/// IP address and port pair.
579#[derive(Clone)]
580pub struct Address {
581    pub ip: String,
582    pub port: i32
583}
584
585impl ConnectionInfo {
586    /// Parse the client address into a [`std::net::SocketAddr`], if the stored
587    /// IP and port are valid. Returns `None` if parsing fails.
588    pub fn peer_addr(&self) -> Option<std::net::SocketAddr> {
589        self.client.to_socket_addr()
590    }
591}
592
593impl Address {
594    /// Parse this address into a [`std::net::SocketAddr`]. Returns `None` if
595    /// the IP string or port value cannot be converted.
596    pub fn to_socket_addr(&self) -> Option<std::net::SocketAddr> {
597        let ip: std::net::IpAddr = self.ip.parse().ok()?;
598        let port = u16::try_from(self.port).ok()?;
599        Some(std::net::SocketAddr::new(ip, port))
600    }
601}
602
603/// Resolves when SIGTERM is received on Unix, or never on other platforms.
604/// Enables a single `select!` branch to handle both SIGTERM and Ctrl+C.
605#[cfg(feature = "http2")]
606async fn sigterm() {
607    #[cfg(unix)]
608    {
609        if let Ok(mut s) = tokio::signal::unix::signal(
610            tokio::signal::unix::SignalKind::terminate()
611        ) {
612            s.recv().await;
613        } else {
614            std::future::pending::<()>().await
615        }
616    }
617    #[cfg(not(unix))]
618    std::future::pending::<()>().await
619}
620
621/// Returns a stream that fires on each SIGHUP on Unix; never fires elsewhere.
622#[cfg(feature = "http2")]
623async fn sighup() {
624    #[cfg(unix)]
625    {
626        if let Ok(mut s) = tokio::signal::unix::signal(
627            tokio::signal::unix::SignalKind::hangup()
628        ) {
629            s.recv().await;
630        } else {
631            std::future::pending::<()>().await
632        }
633    }
634    #[cfg(not(unix))]
635    std::future::pending::<()>().await
636}
637
638#[cfg(feature = "http2")]
639impl Server {
640    pub async fn run_tls(
641        listener: TcpListener,
642        pool: ThreadPool,
643        app: impl Application + Send + 'static + Clone,
644    ) {
645        use crate::tls::create_tls_acceptor_from_vhosts;
646        use crate::h2_handler;
647
648        let cert_path = std::env::var(crate::entry_point::Config::RWS_CONFIG_TLS_CERT_FILE)
649            .unwrap_or_default();
650        let key_path = std::env::var(crate::entry_point::Config::RWS_CONFIG_TLS_KEY_FILE)
651            .unwrap_or_default();
652
653        if cert_path.is_empty() || key_path.is_empty() {
654            println!("No TLS certificate configured — serving plain HTTP/1.1.");
655            tokio::task::block_in_place(|| Server::run(listener, pool, app));
656            return;
657        }
658
659        let vhosts = crate::entry_point::get_virtual_hosts();
660        let mut tls_acceptor = match create_tls_acceptor_from_vhosts(&vhosts, &cert_path, &key_path) {
661            Ok(a) => a,
662            Err(e) => {
663                eprintln!("TLS setup failed: {}", e);
664                return;
665            }
666        };
667
668        listener
669            .set_nonblocking(true)
670            .expect("failed to set TCP listener to non-blocking");
671        let tokio_listener = tokio::net::TcpListener::from_std(listener)
672            .expect("failed to convert TCP listener to tokio");
673
674        println!("Listening for TLS connections (HTTP/1.1 + HTTP/2)...");
675
676        loop {
677            tokio::select! {
678                result = tokio_listener.accept() => {
679                    match result {
680                        Ok((tcp_stream, peer_addr)) => {
681                            let acceptor = tls_acceptor.clone();
682                            let app = app.clone();
683                            tokio::spawn(async move {
684                                match acceptor.accept(tcp_stream).await {
685                                    Ok(tls_stream) => {
686                                        let server_conn = tls_stream.get_ref().1;
687                                        let sni = server_conn.server_name().map(|s| s.to_string());
688                                        let protocol = server_conn
689                                            .alpn_protocol()
690                                            .map(|p| p.to_vec());
691
692                                        match protocol.as_deref() {
693                                            Some(b"h2") => {
694                                                if let Err(e) =
695                                                    h2_handler::handle_connection(tls_stream, peer_addr, sni, app)
696                                                        .await
697                                                {
698                                                    eprintln!("H2 connection error: {}", e);
699                                                }
700                                            }
701                                            _ => {
702                                                if let Err(e) =
703                                                    Server::process_h1_tls(tls_stream, peer_addr, sni, app).await
704                                                {
705                                                    eprintln!("H1 TLS error: {}", e);
706                                                }
707                                            }
708                                        }
709                                    }
710                                    Err(e) => eprintln!("TLS handshake failed: {}", e),
711                                }
712                            });
713                        }
714                        Err(e) => eprintln!("TCP accept error: {}", e),
715                    }
716                }
717                _ = tokio::signal::ctrl_c() => {
718                    crate::metrics::SERVER_READY.store(false, std::sync::atomic::Ordering::SeqCst);
719                    println!("\nShutting down gracefully (SIGINT).");
720                    break;
721                }
722                _ = sigterm() => {
723                    crate::metrics::SERVER_READY.store(false, std::sync::atomic::Ordering::SeqCst);
724                    println!("\nShutting down gracefully (SIGTERM).");
725                    break;
726                }
727                _ = sighup() => {
728                    crate::config_reload::reload();
729                    let vhosts = crate::entry_point::get_virtual_hosts();
730                    if let Ok(new_acceptor) = create_tls_acceptor_from_vhosts(&vhosts, &cert_path, &key_path) {
731                        tls_acceptor = new_acceptor;
732                        println!("[TLS] Certificates reloaded ({} virtual hosts).", vhosts.len());
733                    }
734                }
735            }
736        }
737    }
738
739    /// Binds a plain-HTTP listener on the port in `RWS_CONFIG_HTTP_REDIRECT_PORT` and sends
740    /// `301 Moved Permanently` to the HTTPS equivalent of every incoming URL.
741    /// Returns immediately if TLS is not configured or the redirect port is not set.
742    pub async fn run_redirect() {
743        use std::env;
744        use tokio::io::{AsyncReadExt, AsyncWriteExt};
745        use tokio::net::TcpListener as TokioListener;
746
747        let cert_path = env::var(crate::entry_point::Config::RWS_CONFIG_TLS_CERT_FILE)
748            .unwrap_or_default();
749        if cert_path.is_empty() {
750            return;
751        }
752
753        let redirect_port_str = env::var(crate::entry_point::Config::RWS_CONFIG_HTTP_REDIRECT_PORT)
754            .unwrap_or_default();
755        if redirect_port_str.is_empty() {
756            return;
757        }
758
759        let redirect_port: u16 = match redirect_port_str.parse() {
760            Ok(p) => p,
761            Err(_) => {
762                eprintln!("Invalid RWS_CONFIG_HTTP_REDIRECT_PORT: {}", redirect_port_str);
763                return;
764            }
765        };
766
767        let (server_ip, server_port, _) = get_ip_port_thread_count();
768        let bind_addr = format!("{}:{}", server_ip, redirect_port);
769
770        let listener = match TokioListener::bind(&bind_addr).await {
771            Ok(l) => l,
772            Err(e) => {
773                eprintln!("HTTP redirect listener error on {}: {}", bind_addr, e);
774                return;
775            }
776        };
777
778        println!("HTTP→HTTPS redirect listening on http://{}:{}", server_ip, redirect_port);
779
780        loop {
781            tokio::select! {
782                result = listener.accept() => {
783                    match result {
784                        Ok((mut stream, _peer)) => {
785                            let https_port = server_port;
786                            tokio::spawn(async move {
787                                let mut buf = vec![0u8; 4096];
788                                let n = match stream.read(&mut buf).await {
789                                    Ok(n) => n,
790                                    Err(_) => return,
791                                };
792                                let text = String::from_utf8_lossy(&buf[..n]);
793
794                                let uri = text.lines()
795                                    .next()
796                                    .and_then(|line| line.split_whitespace().nth(1))
797                                    .unwrap_or("/")
798                                    .to_string();
799
800                                let host_header = text.lines()
801                                    .find(|l| l.to_lowercase().starts_with("host:"))
802                                    .map(|l| l[5..].trim().to_string());
803
804                                let location = match host_header {
805                                    Some(h) => {
806                                        // strip existing port from Host header
807                                        let h_no_port = if h.starts_with('[') {
808                                            // IPv6: [::1] or [::1]:port
809                                            h.find(']')
810                                                .map(|i| h[..=i].to_string())
811                                                .unwrap_or(h.clone())
812                                        } else {
813                                            h.rfind(':')
814                                                .map(|i| h[..i].to_string())
815                                                .unwrap_or(h.clone())
816                                        };
817                                        if https_port == 443 {
818                                            format!("https://{}{}", h_no_port, uri)
819                                        } else {
820                                            format!("https://{}:{}{}", h_no_port, https_port, uri)
821                                        }
822                                    }
823                                    None => format!("https://localhost:{}{}", https_port, uri),
824                                };
825
826                                let response = format!(
827                                    "HTTP/1.1 301 Moved Permanently\r\nLocation: {}\r\nContent-Length: 0\r\nConnection: close\r\n\r\n",
828                                    location
829                                );
830                                let _ = stream.write_all(response.as_bytes()).await;
831                            });
832                        }
833                        Err(e) => eprintln!("HTTP redirect accept error: {}", e),
834                    }
835                }
836                _ = tokio::signal::ctrl_c() => {
837                    println!("\nShutting down HTTP redirect listener (SIGINT).");
838                    break;
839                }
840                _ = sigterm() => {
841                    println!("\nShutting down HTTP redirect listener (SIGTERM).");
842                    break;
843                }
844                _ = sighup() => {
845                    crate::config_reload::reload();
846                }
847            }
848        }
849    }
850
851    async fn process_h1_tls(
852        mut stream: tokio_rustls::server::TlsStream<tokio::net::TcpStream>,
853        peer_addr: std::net::SocketAddr,
854        sni_hostname: Option<String>,
855        app: impl Application,
856    ) -> Result<(), String> {
857        use tokio::io::{AsyncReadExt, AsyncWriteExt};
858
859        let (server_ip, server_port, _) = get_ip_port_thread_count();
860        let request_allocation_size = get_request_allocation_size();
861
862        let mut buffer = vec![0u8; request_allocation_size as usize];
863        let n = match stream.read(&mut buffer).await {
864            Ok(n) => n,
865            Err(e) => {
866                let raw = Server::bad_request_response(e.to_string());
867                let _ = stream.write_all(&raw).await;
868                return Ok(());
869            }
870        };
871
872        // Trim to the bytes actually read — passing the zero-padded rest of
873        // `buffer` to Request::parse would append trailing NUL bytes to the body.
874        let mut request = match Request::parse(&buffer[..n]) {
875            Ok(r) => r,
876            Err(message) => {
877                let raw = Server::bad_request_response(message);
878                let _ = stream.write_all(&raw).await;
879                return Ok(());
880            }
881        };
882
883        // If Content-Length declares more body than this one read delivered,
884        // keep reading until it's fully received — a single `request_allocation_size`
885        // read is not big enough for bodies larger than that buffer.
886        if let Some(declared_len) = request
887            .get_header(Header::_CONTENT_LENGTH.to_string())
888            .and_then(|h| h.value.trim().parse::<u64>().ok())
889        {
890            let max_body_size = crate::entry_point::get_max_body_size();
891            if max_body_size > 0 && declared_len > max_body_size {
892                let raw = Server::payload_too_large_response(format!(
893                    "413 Payload Too Large: declared Content-Length {} exceeds the {} byte limit",
894                    declared_len, max_body_size
895                ));
896                let _ = stream.write_all(&raw).await;
897                return Ok(());
898            }
899
900            while (request.body.len() as u64) < declared_len {
901                let mut more = vec![0u8; request_allocation_size as usize];
902                match stream.read(&mut more).await {
903                    Ok(0) | Err(_) => break, // peer closed early; proceed with the short body
904                    Ok(k) => request.body.extend_from_slice(&more[..k]),
905                }
906            }
907        }
908
909        let connection = ConnectionInfo {
910            client: Address {
911                ip: peer_addr.ip().to_string(),
912                port: peer_addr.port() as i32,
913            },
914            server: Address {
915                ip: server_ip,
916                port: server_port,
917            },
918            request_size: request_allocation_size,
919            sni_hostname,
920        };
921
922        let mut response = match app.execute(&request, &connection) {
923            Ok(r) => r,
924            Err(message) => {
925                let raw = Server::bad_request_response(message);
926                let _ = stream.write_all(&raw).await;
927                return Ok(());
928            }
929        };
930
931        crate::metrics::record_request();
932        crate::compression::apply_gzip(&request, &mut response);
933        response.headers.push(Header::get_hsts_header());
934
935        #[cfg(feature = "http3")]
936        response.headers.push(Header {
937            name: Header::_ALT_SVC.to_string(),
938            value: format!("h3=\":{}\"", server_port),
939        });
940        #[cfg(not(feature = "http3"))]
941        response.headers.push(Header {
942            name: Header::_ALT_SVC.to_string(),
943            value: format!("h2=\":{}\"", server_port),
944        });
945
946        Log::log_access(&request, &response, &peer_addr);
947
948        let raw = Response::generate_response(response, request);
949        stream
950            .write_all(&raw)
951            .await
952            .map_err(|e| e.to_string())?;
953        stream.flush().await.map_err(|e| e.to_string())?;
954
955        Ok(())
956    }
957}
958
959#[cfg(feature = "http3")]
960impl Server {
961    pub async fn run_quic(
962        app: impl Application + Send + 'static + Clone,
963    ) {
964        use crate::tls::create_quinn_server_config_from_vhosts;
965        use crate::h3_handler;
966
967        let cert_path = std::env::var(crate::entry_point::Config::RWS_CONFIG_TLS_CERT_FILE)
968            .unwrap_or_default();
969        let key_path = std::env::var(crate::entry_point::Config::RWS_CONFIG_TLS_KEY_FILE)
970            .unwrap_or_default();
971
972        if cert_path.is_empty() || key_path.is_empty() {
973            return;
974        }
975
976        let vhosts = crate::entry_point::get_virtual_hosts();
977        let server_config = match create_quinn_server_config_from_vhosts(&vhosts, &cert_path, &key_path) {
978            Ok(c) => c,
979            Err(e) => {
980                eprintln!("QUIC TLS setup failed: {}", e);
981                return;
982            }
983        };
984
985        let (server_ip, server_port, _) = get_ip_port_thread_count();
986        let bind_addr = format!("{}:{}", server_ip, server_port);
987        let addr: std::net::SocketAddr = match bind_addr.parse() {
988            Ok(a) => a,
989            Err(e) => {
990                eprintln!("Invalid QUIC bind address '{}': {}", bind_addr, e);
991                return;
992            }
993        };
994
995        let endpoint = match quinn::Endpoint::server(server_config, addr) {
996            Ok(e) => e,
997            Err(e) => {
998                eprintln!("QUIC endpoint error: {}", e);
999                return;
1000            }
1001        };
1002
1003        println!("Listening for QUIC/HTTP3 on UDP {}:{}", server_ip, server_port);
1004
1005        loop {
1006            tokio::select! {
1007                maybe = endpoint.accept() => {
1008                    match maybe {
1009                        Some(incoming) => {
1010                            let app = app.clone();
1011                            tokio::spawn(async move {
1012                                match incoming.await {
1013                                    Ok(conn) => {
1014                                        let peer_addr = conn.remote_address();
1015                                        if let Err(e) = h3_handler::handle_connection(conn, peer_addr, app).await {
1016                                            eprintln!("H3 connection error: {}", e);
1017                                        }
1018                                    }
1019                                    Err(e) => eprintln!("QUIC connection error: {}", e),
1020                                }
1021                            });
1022                        }
1023                        None => break,
1024                    }
1025                }
1026                _ = tokio::signal::ctrl_c() => {
1027                    crate::metrics::SERVER_READY.store(false, std::sync::atomic::Ordering::SeqCst);
1028                    println!("\nShutting down QUIC (SIGINT).");
1029                    endpoint.close(0u32.into(), b"shutdown");
1030                    break;
1031                }
1032                _ = sigterm() => {
1033                    crate::metrics::SERVER_READY.store(false, std::sync::atomic::Ordering::SeqCst);
1034                    println!("\nShutting down QUIC (SIGTERM).");
1035                    endpoint.close(0u32.into(), b"shutdown");
1036                    break;
1037                }
1038                _ = sighup() => {
1039                    crate::config_reload::reload();
1040                }
1041            }
1042        }
1043    }
1044}
1045
1046