1use crate::error::ServerError;
15use crate::request::Request;
16use crate::response::Response;
17use crossbeam_channel::{Receiver, Sender, unbounded};
18use serde::{Deserialize, Serialize};
19use std::collections::HashMap;
20use std::fs;
21use std::io;
22use std::net::{IpAddr, TcpListener, TcpStream};
23use std::path::{Path, PathBuf};
24use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
25#[cfg(test)]
26use std::sync::mpsc;
27use std::sync::{Arc, Mutex, Once, OnceLock, RwLock};
28use std::thread;
29use std::time::{Duration, Instant, UNIX_EPOCH};
30
31static SHUTDOWN_SIGNAL_SLOT: OnceLock<
32 Mutex<Option<Arc<ShutdownSignal>>>,
33> = OnceLock::new();
34static SIGNAL_HANDLER_INSTALL: Once = Once::new();
35const RATE_LIMIT_SHARDS: usize = 16;
40type RateLimitShard = Mutex<HashMap<IpAddr, Vec<Instant>>>;
41type RateLimitTable = [RateLimitShard; RATE_LIMIT_SHARDS];
42static RATE_LIMIT_STATE: OnceLock<RateLimitTable> = OnceLock::new();
43
44const ETAG_CACHE_MAX: usize = 256;
48type EtagCache = RwLock<HashMap<(u64, u64), Arc<str>>>;
49static ETAG_CACHE: OnceLock<EtagCache> = OnceLock::new();
50static METRIC_REQUESTS_TOTAL: AtomicUsize = AtomicUsize::new(0);
51static METRIC_RESPONSES_4XX: AtomicUsize = AtomicUsize::new(0);
52static METRIC_RESPONSES_5XX: AtomicUsize = AtomicUsize::new(0);
53static METRIC_RATE_LIMITED: AtomicUsize = AtomicUsize::new(0);
54
55#[doc(alias = "http server")]
77#[doc(alias = "static file server")]
78#[derive(
79 Clone, Debug, PartialEq, Eq, Default, Serialize, Deserialize,
80)]
81pub struct Server {
82 address: String,
83 document_root: PathBuf,
84 #[serde(skip, default)]
88 canonical_document_root: PathBuf,
89 cors_enabled: Option<bool>,
90 cors_origins: Option<Vec<String>>,
91 custom_headers: Option<HashMap<String, String>>,
92 request_timeout: Option<Duration>,
93 connection_timeout: Option<Duration>,
94 rate_limit_per_minute: Option<usize>,
95 static_cache_ttl_secs: Option<u64>,
96 max_buffered_body_bytes: Option<u64>,
102}
103
104#[doc(alias = "builder")]
132#[doc(alias = "configuration")]
133#[derive(Clone, Debug, Default)]
134pub struct ServerBuilder {
135 address: Option<String>,
136 document_root: Option<PathBuf>,
137 cors_enabled: Option<bool>,
138 cors_origins: Option<Vec<String>>,
139 custom_headers: Option<HashMap<String, String>>,
140 request_timeout: Option<Duration>,
141 connection_timeout: Option<Duration>,
142 rate_limit_per_minute: Option<usize>,
143 static_cache_ttl_secs: Option<u64>,
144 max_buffered_body_bytes: Option<u64>,
145}
146
147impl ServerBuilder {
148 #[doc(alias = "new builder")]
164 pub fn new() -> Self {
165 Self::default()
166 }
167
168 #[doc(alias = "bind address")]
187 pub fn address(mut self, address: &str) -> Self {
188 self.address = Some(address.to_string());
189 self
190 }
191
192 #[doc(alias = "document root")]
211 pub fn document_root(mut self, path: &str) -> Self {
212 self.document_root = Some(PathBuf::from(path));
213 self
214 }
215
216 pub fn enable_cors(mut self) -> Self {
218 self.cors_enabled = Some(true);
219 self
220 }
221
222 pub fn disable_cors(mut self) -> Self {
224 self.cors_enabled = Some(false);
225 self
226 }
227
228 pub fn cors_origins(mut self, origins: Vec<String>) -> Self {
230 self.cors_origins = Some(origins);
231 self.cors_enabled = Some(true); self
233 }
234
235 pub fn custom_header(mut self, name: &str, value: &str) -> Self {
237 let mut headers = self.custom_headers.unwrap_or_default();
238 let _ = headers.insert(name.to_string(), value.to_string());
239 self.custom_headers = Some(headers);
240 self
241 }
242
243 pub fn custom_headers(
245 mut self,
246 headers: HashMap<String, String>,
247 ) -> Self {
248 self.custom_headers = Some(headers);
249 self
250 }
251
252 pub fn request_timeout(mut self, timeout: Duration) -> Self {
254 self.request_timeout = Some(timeout);
255 self
256 }
257
258 pub fn connection_timeout(mut self, timeout: Duration) -> Self {
260 self.connection_timeout = Some(timeout);
261 self
262 }
263
264 pub fn rate_limit_per_minute(mut self, requests: usize) -> Self {
266 self.rate_limit_per_minute = Some(requests.max(1));
267 self
268 }
269
270 pub fn static_cache_ttl_secs(mut self, ttl: u64) -> Self {
272 self.static_cache_ttl_secs = Some(ttl);
273 self
274 }
275
276 pub fn max_buffered_body_bytes(mut self, bytes: u64) -> Self {
298 self.max_buffered_body_bytes = Some(bytes);
299 self
300 }
301
302 #[doc(alias = "finalize")]
326 pub fn build(self) -> Result<Server, &'static str> {
327 let address = self.address.ok_or("Address is required")?;
328 let document_root =
329 self.document_root.ok_or("Document root is required")?;
330 let canonical_document_root = fs::canonicalize(&document_root)
333 .unwrap_or_else(|_| document_root.clone());
334
335 Ok(Server {
336 address,
337 document_root,
338 canonical_document_root,
339 cors_enabled: self.cors_enabled,
340 cors_origins: self.cors_origins,
341 custom_headers: self.custom_headers,
342 request_timeout: self.request_timeout,
343 connection_timeout: self.connection_timeout,
344 rate_limit_per_minute: self.rate_limit_per_minute,
345 static_cache_ttl_secs: self.static_cache_ttl_secs,
346 max_buffered_body_bytes: self.max_buffered_body_bytes,
347 })
348 }
349}
350
351#[derive(Debug, Clone)]
353pub struct ShutdownSignal {
354 pub should_shutdown: Arc<AtomicBool>,
356 pub active_connections: Arc<AtomicUsize>,
358 pub shutdown_timeout: Duration,
360}
361
362impl Default for ShutdownSignal {
363 fn default() -> Self {
364 Self::new(Duration::from_secs(30))
365 }
366}
367
368impl ShutdownSignal {
369 pub fn new(shutdown_timeout: Duration) -> Self {
371 Self {
372 should_shutdown: Arc::new(AtomicBool::new(false)),
373 active_connections: Arc::new(AtomicUsize::new(0)),
374 shutdown_timeout,
375 }
376 }
377
378 pub fn shutdown(&self) {
380 self.should_shutdown.store(true, Ordering::SeqCst);
381 println!(
382 "🛑 Shutdown signal received. Waiting for active connections to finish..."
383 );
384 }
385
386 pub fn is_shutdown_requested(&self) -> bool {
388 self.should_shutdown.load(Ordering::SeqCst)
389 }
390
391 pub fn connection_started(&self) {
393 let _ = self.active_connections.fetch_add(1, Ordering::SeqCst);
394 }
395
396 pub fn connection_finished(&self) {
398 let _ = self.active_connections.fetch_sub(1, Ordering::SeqCst);
399 }
400
401 pub fn active_connection_count(&self) -> usize {
403 self.active_connections.load(Ordering::SeqCst)
404 }
405
406 pub fn wait_for_shutdown(&self) -> bool {
408 let start_time = Instant::now();
409
410 while self.active_connection_count() > 0
411 && start_time.elapsed() < self.shutdown_timeout
412 {
413 let remaining = self
414 .shutdown_timeout
415 .saturating_sub(start_time.elapsed());
416 println!(
417 "⏳ Waiting for {} active connection(s) to finish... ({:.1}s remaining)",
418 self.active_connection_count(),
419 remaining.as_secs_f32()
420 );
421
422 thread::sleep(remaining.min(Duration::from_millis(50)));
424 }
425
426 let remaining_connections = self.active_connection_count();
427 if remaining_connections > 0 {
428 println!(
429 "⚠️ Shutdown timeout reached. {} connection(s) will be forcibly terminated.",
430 remaining_connections
431 );
432 false
433 } else {
434 println!("✅ All connections closed gracefully.");
435 true
436 }
437 }
438}
439
440pub struct ThreadPool {
442 workers: Vec<Worker>,
443 sender: Sender<Job>,
444}
445
446impl std::fmt::Debug for ThreadPool {
447 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
448 f.debug_struct("ThreadPool")
449 .field("workers", &self.workers)
450 .field("sender", &"<Sender<Job>>")
451 .finish()
452 }
453}
454
455type Job = Box<dyn FnOnce() + Send + 'static>;
457
458#[derive(Debug)]
460struct Worker {
461 id: usize,
462 thread: Option<thread::JoinHandle<()>>,
463}
464
465impl ThreadPool {
466 pub fn new(size: usize) -> ThreadPool {
474 assert!(size > 0);
475
476 let (sender, receiver) = unbounded();
481
482 let mut workers = Vec::with_capacity(size);
483
484 for id in 0..size {
485 workers.push(Worker::new(id, receiver.clone()));
486 }
487
488 ThreadPool { workers, sender }
490 }
491
492 pub fn execute<F>(&self, f: F)
497 where
498 F: FnOnce() + Send + 'static,
499 {
500 let job = Box::new(f);
501 self.sender.send(job).unwrap();
502 }
503}
504
505impl Drop for ThreadPool {
506 fn drop(&mut self) {
507 let (replacement_sender, _replacement_receiver) = unbounded();
512 let old_sender =
513 std::mem::replace(&mut self.sender, replacement_sender);
514 drop(old_sender);
515
516 for worker in &mut self.workers {
517 println!("Shutting down worker {}", worker.id);
518
519 if let Some(thread) = worker.thread.take() {
520 thread.join().unwrap();
521 }
522 }
523 }
524}
525
526impl Worker {
527 fn new(id: usize, receiver: Receiver<Job>) -> Worker {
528 let thread = thread::spawn(move || {
529 while let Ok(job) = receiver.recv() {
530 job();
531 }
532 println!("Worker {id} disconnected; shutting down.");
533 });
534
535 Worker {
536 id,
537 thread: Some(thread),
538 }
539 }
540}
541
542#[derive(Debug)]
544pub struct ConnectionPool {
545 max_connections: usize,
546 active_connections: Arc<AtomicUsize>,
547}
548
549impl ConnectionPool {
550 pub fn new(max_connections: usize) -> Self {
552 Self {
554 max_connections,
555 active_connections: Arc::new(AtomicUsize::new(0)),
556 }
557 }
558
559 pub fn acquire(&self) -> Result<ConnectionGuard, io::Error> {
561 #[allow(deprecated_in_future)]
562 let reserved = self.active_connections.fetch_update(
563 Ordering::SeqCst,
564 Ordering::SeqCst,
565 |current| {
566 if current < self.max_connections {
567 Some(current + 1)
568 } else {
569 None
570 }
571 },
572 );
573 if reserved.is_err() {
574 return Err(io::Error::new(
575 io::ErrorKind::WouldBlock,
576 "Connection pool exhausted",
577 ));
578 }
579 Ok(ConnectionGuard {
580 pool: Arc::clone(&self.active_connections),
581 })
582 }
583
584 pub fn active_count(&self) -> usize {
586 self.active_connections.load(Ordering::SeqCst)
587 }
588}
589
590#[derive(Debug)]
592pub struct ConnectionGuard {
593 pool: Arc<AtomicUsize>,
594}
595
596impl Drop for ConnectionGuard {
597 fn drop(&mut self) {
598 let _ = self.pool.fetch_sub(1, Ordering::SeqCst);
599 }
600}
601
602impl Server {
603 #[doc(alias = "constructor")]
621 pub fn new(address: &str, document_root: &str) -> Self {
622 let document_root = PathBuf::from(document_root);
623 let canonical_document_root = fs::canonicalize(&document_root)
624 .unwrap_or_else(|_| document_root.clone());
625 Server {
626 address: address.to_string(),
627 document_root,
628 canonical_document_root,
629 cors_enabled: None,
630 cors_origins: None,
631 custom_headers: None,
632 request_timeout: None,
633 connection_timeout: None,
634 rate_limit_per_minute: None,
635 static_cache_ttl_secs: None,
636 max_buffered_body_bytes: None,
637 }
638 }
639
640 pub fn builder() -> ServerBuilder {
659 ServerBuilder::new()
660 }
661
662 #[doc(alias = "listen")]
684 #[doc(alias = "serve")]
685 pub fn start(&self) -> io::Result<()> {
686 let listener = TcpListener::bind(&self.address)?;
687 println!("❯ Server is now running at http://{}", self.address);
688 println!(" Document root: {}", self.document_root.display());
689 println!(" Press Ctrl+C to stop the server.");
690
691 Self::run_basic_accept_loop(listener.incoming(), self.clone());
692
693 Ok(())
694 }
695
696 #[doc(alias = "graceful shutdown")]
719 pub fn start_with_graceful_shutdown(
720 &self,
721 shutdown_timeout: Duration,
722 ) -> io::Result<()> {
723 let shutdown = Arc::new(ShutdownSignal::new(shutdown_timeout));
724 self.start_with_shutdown_signal(shutdown)
725 }
726
727 #[doc(alias = "shutdown signal")]
749 pub fn start_with_shutdown_signal(
750 &self,
751 shutdown: Arc<ShutdownSignal>,
752 ) -> io::Result<()> {
753 self.start_with_shutdown_signal_and_ready(shutdown, |_| {})
754 }
755
756 pub fn start_with_shutdown_signal_and_ready<F>(
770 &self,
771 shutdown: Arc<ShutdownSignal>,
772 on_ready: F,
773 ) -> io::Result<()>
774 where
775 F: FnOnce(String),
776 {
777 Self::install_signal_handlers(shutdown.clone());
779
780 let listener = TcpListener::bind(&self.address)?;
781 let bound_address = listener.local_addr()?.to_string();
782 on_ready(bound_address.clone());
783 println!("❯ Server is now running at http://{}", bound_address);
784 println!(" Document root: {}", self.document_root.display());
785 println!(" Press Ctrl+C to stop the server gracefully.");
786
787 listener.set_nonblocking(true)?;
789
790 const MIN_IDLE_SLEEP: Duration = Duration::from_micros(100);
796 const MAX_IDLE_SLEEP: Duration = Duration::from_millis(5);
797 let mut idle_sleep = MIN_IDLE_SLEEP;
798
799 loop {
800 if shutdown.is_shutdown_requested() {
802 println!(
803 "🛑 Shutdown requested. Stopping new connections..."
804 );
805 break;
806 }
807
808 match listener.accept() {
809 Ok((stream, _addr)) => {
810 idle_sleep = MIN_IDLE_SLEEP;
811 Self::run_tracked_accept(
812 stream,
813 self.clone(),
814 shutdown.clone(),
815 );
816 }
817 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
818 thread::sleep(idle_sleep);
819 idle_sleep = (idle_sleep * 2).min(MAX_IDLE_SLEEP);
820 }
821 Err(e) => Self::log_listener_error(e),
822 }
823 }
824
825 let graceful = shutdown.wait_for_shutdown();
827
828 if graceful {
829 println!("✅ Server shut down gracefully.");
830 } else {
831 println!(
832 "⚠️ Server shut down with active connections remaining."
833 );
834 }
835
836 Ok(())
837 }
838
839 fn install_signal_handlers(shutdown: Arc<ShutdownSignal>) {
845 let slot =
846 SHUTDOWN_SIGNAL_SLOT.get_or_init(|| Mutex::new(None));
847
848 if let Ok(mut guard) = slot.lock() {
850 *guard = Some(shutdown);
851 }
852
853 SIGNAL_HANDLER_INSTALL.call_once(|| {
855 let _ = ctrlc::set_handler(Self::handle_shutdown_signal);
856 });
857 }
858
859 fn handle_shutdown_signal() {
860 if let Some(slot) = SHUTDOWN_SIGNAL_SLOT.get() {
861 Self::trigger_shutdown_from_slot(slot);
862 }
863 }
864
865 fn trigger_shutdown_from_slot(
866 slot: &Mutex<Option<Arc<ShutdownSignal>>>,
867 ) {
868 if let Ok(guard) = slot.lock()
869 && let Some(shutdown_signal) = guard.as_ref()
870 {
871 shutdown_signal.shutdown();
872 }
873 }
874
875 pub fn start_with_thread_pool(
888 &self,
889 thread_pool_size: usize,
890 ) -> io::Result<()> {
891 let thread_pool = ThreadPool::new(thread_pool_size);
892 let listener = TcpListener::bind(&self.address)?;
893
894 println!("❯ Server is now running at http://{}", self.address);
895 println!(" Document root: {}", self.document_root.display());
896 println!(" Thread pool size: {} workers", thread_pool_size);
897 println!(" Press Ctrl+C to stop the server.");
898
899 Self::run_thread_pool_accept_loop(
900 listener.incoming(),
901 self.clone(),
902 &thread_pool,
903 );
904
905 Ok(())
906 }
907
908 pub fn start_with_pooling(
924 &self,
925 thread_pool_size: usize,
926 max_connections: usize,
927 ) -> io::Result<()> {
928 let thread_pool = ThreadPool::new(thread_pool_size);
929 let connection_pool =
930 Arc::new(ConnectionPool::new(max_connections));
931 let listener = TcpListener::bind(&self.address)?;
932
933 println!("❯ Server is now running at http://{}", self.address);
934 println!(" Document root: {}", self.document_root.display());
935 println!(" Thread pool size: {} workers", thread_pool_size);
936 println!(" Max concurrent connections: {}", max_connections);
937 println!(" Press Ctrl+C to stop the server.");
938
939 Self::run_pooling_accept_loop(
940 listener.incoming(),
941 self.clone(),
942 &thread_pool,
943 connection_pool,
944 );
945
946 Ok(())
947 }
948
949 fn log_connection_result(result: Result<(), ServerError>) {
950 if let Err(error) = result {
951 eprintln!("Error handling connection: {}", error);
952 }
953 }
954
955 fn log_listener_error(error: io::Error) {
956 eprintln!("Connection error: {}", error);
957 }
958
959 fn run_tracked_accept(
960 stream: TcpStream,
961 server: Server,
962 shutdown: Arc<ShutdownSignal>,
963 ) {
964 shutdown.connection_started();
965 let _ = thread::spawn(move || {
966 let result =
967 handle_connection_tracked(stream, &server, &shutdown);
968 shutdown.connection_finished();
969 Self::log_connection_result(result);
970 });
971 }
972
973 fn run_basic_accept_loop<I>(incoming: I, server: Server)
974 where
975 I: IntoIterator<Item = io::Result<TcpStream>>,
976 {
977 for stream in incoming {
978 match stream {
979 Ok(stream) => {
980 let server = server.clone();
981 let _ = thread::spawn(move || {
982 Self::log_connection_result(handle_connection(
983 stream, &server,
984 ));
985 });
986 }
987 Err(error) => Self::log_listener_error(error),
988 }
989 }
990 }
991
992 fn run_thread_pool_accept_loop<I>(
993 incoming: I,
994 server: Server,
995 thread_pool: &ThreadPool,
996 ) where
997 I: IntoIterator<Item = io::Result<TcpStream>>,
998 {
999 for stream in incoming {
1000 match stream {
1001 Ok(stream) => {
1002 let server = server.clone();
1003 thread_pool.execute(move || {
1004 Self::log_connection_result(handle_connection(
1005 stream, &server,
1006 ));
1007 });
1008 }
1009 Err(error) => Self::log_listener_error(error),
1010 }
1011 }
1012 }
1013
1014 fn run_pooling_accept_loop<I>(
1015 incoming: I,
1016 server: Server,
1017 thread_pool: &ThreadPool,
1018 connection_pool: Arc<ConnectionPool>,
1019 ) where
1020 I: IntoIterator<Item = io::Result<TcpStream>>,
1021 {
1022 for stream in incoming {
1023 match stream {
1024 Ok(stream) => {
1025 let server = server.clone();
1026 let pool_clone = Arc::clone(&connection_pool);
1027 thread_pool.execute(move || match pool_clone.acquire() {
1028 Ok(_guard) => Self::log_connection_result(
1029 handle_connection(stream, &server),
1030 ),
1031 Err(_) => {
1032 if let Err(error) =
1033 send_service_unavailable(stream)
1034 {
1035 eprintln!(
1036 "Error sending service unavailable: {}",
1037 error
1038 );
1039 }
1040 }
1041 });
1042 }
1043 Err(error) => Self::log_listener_error(error),
1044 }
1045 }
1046 }
1047
1048 pub fn cors_enabled(&self) -> Option<bool> {
1052 self.cors_enabled
1053 }
1054
1055 pub fn cors_origins(&self) -> &Option<Vec<String>> {
1057 &self.cors_origins
1058 }
1059
1060 pub fn custom_headers(&self) -> &Option<HashMap<String, String>> {
1062 &self.custom_headers
1063 }
1064
1065 pub fn request_timeout(&self) -> Option<Duration> {
1067 self.request_timeout
1068 }
1069
1070 pub fn connection_timeout(&self) -> Option<Duration> {
1072 self.connection_timeout
1073 }
1074
1075 pub fn address(&self) -> &str {
1077 &self.address
1078 }
1079
1080 pub fn document_root(&self) -> &PathBuf {
1082 &self.document_root
1083 }
1084
1085 pub fn canonical_document_root(&self) -> &Path {
1092 if self.canonical_document_root.as_os_str().is_empty() {
1093 &self.document_root
1094 } else {
1095 &self.canonical_document_root
1096 }
1097 }
1098
1099 pub fn max_buffered_body_bytes(&self) -> u64 {
1104 self.max_buffered_body_bytes
1105 .unwrap_or(DEFAULT_MAX_BUFFERED_BODY_BYTES)
1106 }
1107}
1108
1109fn send_service_unavailable(mut stream: TcpStream) -> io::Result<()> {
1119 let mut response = Response::new(
1120 503,
1121 "SERVICE UNAVAILABLE",
1122 b"Service temporarily unavailable. Please try again later."
1123 .to_vec(),
1124 );
1125
1126 response.add_header("Content-Type", "text/plain");
1127 response.add_header("Retry-After", "1"); response.add_header("Connection", "close");
1129
1130 response.send(&mut stream).map_err(|e| {
1131 use std::io::Error;
1132 Error::other(format!("Failed to send response: {}", e))
1133 })?;
1134 Ok(())
1135}
1136
1137pub(crate) const MAX_KEEPALIVE_REQUESTS: usize = 100;
1142
1143pub(crate) const KEEPALIVE_IDLE_TIMEOUT: Duration =
1149 Duration::from_secs(5);
1150
1151pub const DEFAULT_MAX_BUFFERED_BODY_BYTES: u64 = 64 * 1024 * 1024;
1167
1168#[derive(Clone, Copy, Debug, PartialEq, Eq)]
1177pub(crate) enum ConnectionPolicy {
1178 KeepAlive,
1179 Close,
1180}
1181
1182impl ConnectionPolicy {
1183 pub(crate) fn header_value(self) -> &'static str {
1184 match self {
1185 ConnectionPolicy::KeepAlive => "keep-alive",
1186 ConnectionPolicy::Close => "close",
1187 }
1188 }
1189
1190 pub(crate) fn from_request(request: &Request) -> Self {
1191 let connection_header = request
1192 .header("connection")
1193 .map(|h| h.trim().to_ascii_lowercase());
1194 match request.version() {
1195 "HTTP/1.1" => match connection_header.as_deref() {
1196 Some("close") => ConnectionPolicy::Close,
1197 _ => ConnectionPolicy::KeepAlive,
1198 },
1199 _ => match connection_header.as_deref() {
1200 Some("keep-alive") => ConnectionPolicy::KeepAlive,
1201 _ => ConnectionPolicy::Close,
1202 },
1203 }
1204 }
1205}
1206
1207pub(crate) fn handle_connection(
1223 mut stream: TcpStream,
1224 server: &Server,
1225) -> Result<(), ServerError> {
1226 let _ = stream.set_nodelay(true);
1229 let timeout =
1230 server.request_timeout.unwrap_or(Duration::from_secs(30));
1231 stream.set_read_timeout(Some(timeout))?;
1232 stream.set_write_timeout(Some(timeout))?;
1233
1234 let peer_ip = stream.peer_addr().ok().map(|addr| addr.ip());
1235
1236 for i in 0..MAX_KEEPALIVE_REQUESTS {
1237 if i > 0 {
1238 stream.set_read_timeout(Some(KEEPALIVE_IDLE_TIMEOUT))?;
1243 }
1244 let (mut response, policy) =
1245 build_response_for_stream(server, &stream, peer_ip);
1246 response.set_connection_header(policy.header_value());
1250
1251 if response.send(&mut stream).is_err() {
1252 return Ok(());
1254 }
1255 if policy == ConnectionPolicy::Close {
1256 return Ok(());
1257 }
1258 }
1259 Ok(())
1260}
1261
1262fn handle_connection_tracked(
1277 mut stream: TcpStream,
1278 server: &Server,
1279 _shutdown: &ShutdownSignal,
1280) -> Result<(), ServerError> {
1281 stream.set_nonblocking(false)?;
1283 let _ = stream.set_nodelay(true);
1285
1286 let timeout =
1288 server.connection_timeout.unwrap_or(Duration::from_secs(30));
1289 stream.set_read_timeout(Some(timeout))?;
1290 stream.set_write_timeout(Some(timeout))?;
1291
1292 let peer_ip = stream.peer_addr().ok().map(|addr| addr.ip());
1293
1294 for i in 0..MAX_KEEPALIVE_REQUESTS {
1295 if i > 0 {
1296 stream.set_read_timeout(Some(KEEPALIVE_IDLE_TIMEOUT))?;
1297 }
1298 let (mut response, policy) =
1299 build_response_for_stream(server, &stream, peer_ip);
1300 response.set_connection_header(policy.header_value());
1301 if response.send(&mut stream).is_err() {
1302 return Ok(());
1303 }
1304 if policy == ConnectionPolicy::Close {
1305 return Ok(());
1306 }
1307 }
1308 Ok(())
1309}
1310
1311fn build_response_for_stream(
1312 server: &Server,
1313 stream: &TcpStream,
1314 peer_ip: Option<IpAddr>,
1315) -> (Response, ConnectionPolicy) {
1316 match Request::from_stream(stream) {
1317 Ok(request) => {
1318 let policy = ConnectionPolicy::from_request(&request);
1323 if request.path() == "/metrics" && request.method() == "GET"
1324 {
1325 return (generate_metrics_response(), policy);
1326 }
1327 if let Some(ip) = peer_ip
1328 && is_rate_limited(server, ip)
1329 {
1330 let _ =
1331 METRIC_RATE_LIMITED.fetch_add(1, Ordering::Relaxed);
1332 return (generate_too_many_requests_response(), policy);
1333 }
1334 (
1335 build_response_for_request_with_metrics(
1336 server, &request,
1337 ),
1338 policy,
1339 )
1340 }
1341 Err(error) => (
1342 response_from_error(&error, &server.document_root),
1343 ConnectionPolicy::Close,
1344 ),
1345 }
1346}
1347
1348pub(crate) fn build_response_for_request_with_metrics(
1353 server: &Server,
1354 request: &Request,
1355) -> Response {
1356 let response = build_response_for_request(server, request);
1357 record_metrics(&response);
1358 response
1359}
1360
1361pub(crate) fn build_response_for_request(
1363 server: &Server,
1364 request: &Request,
1365) -> Response {
1366 let generated = match request.method() {
1367 "GET" => generate_response_with_cache(
1368 request,
1369 &server.document_root,
1370 &server.canonical_document_root,
1371 server.static_cache_ttl_secs,
1372 server.max_buffered_body_bytes(),
1373 ),
1374 "HEAD" => {
1375 generate_head_response(request, &server.document_root)
1376 }
1377 "OPTIONS" => generate_options_response(request),
1378 _ => Ok(generate_method_not_allowed_response()),
1379 };
1380 match generated {
1381 Ok(response) => {
1382 apply_response_policies(response, server, request)
1383 }
1384 Err(error) => {
1385 response_from_error(&error, &server.document_root)
1386 }
1387 }
1388}
1389
1390fn record_metrics(response: &Response) {
1391 let _ = METRIC_REQUESTS_TOTAL.fetch_add(1, Ordering::Relaxed);
1392 if (400..500).contains(&response.status_code) {
1393 let _ = METRIC_RESPONSES_4XX.fetch_add(1, Ordering::Relaxed);
1394 } else if response.status_code >= 500 {
1395 let _ = METRIC_RESPONSES_5XX.fetch_add(1, Ordering::Relaxed);
1396 }
1397}
1398
1399fn generate_metrics_response() -> Response {
1400 let body = format!(
1401 "http_handle_requests_total {}\nhttp_handle_responses_4xx_total {}\nhttp_handle_responses_5xx_total {}\nhttp_handle_rate_limited_total {}\n",
1402 METRIC_REQUESTS_TOTAL.load(Ordering::Relaxed),
1403 METRIC_RESPONSES_4XX.load(Ordering::Relaxed),
1404 METRIC_RESPONSES_5XX.load(Ordering::Relaxed),
1405 METRIC_RATE_LIMITED.load(Ordering::Relaxed),
1406 );
1407 let mut response = Response::new(200, "OK", body.into_bytes());
1408 response.add_header("Content-Type", "text/plain; version=0.0.3");
1409 response
1410}
1411
1412fn generate_too_many_requests_response() -> Response {
1413 let mut response = Response::new(
1414 429,
1415 "TOO MANY REQUESTS",
1416 b"Rate limit exceeded".to_vec(),
1417 );
1418 response.add_header("Content-Type", "text/plain");
1419 response.add_header("Retry-After", "60");
1420 response
1421}
1422
1423fn rate_limit_shard_index(ip: IpAddr) -> usize {
1424 use std::collections::hash_map::DefaultHasher;
1425 use std::hash::{Hash, Hasher};
1426 let mut h = DefaultHasher::new();
1427 ip.hash(&mut h);
1428 (h.finish() as usize) & (RATE_LIMIT_SHARDS - 1)
1429}
1430
1431fn rate_limit_table() -> &'static RateLimitTable {
1432 RATE_LIMIT_STATE.get_or_init(|| {
1433 std::array::from_fn(|_| Mutex::new(HashMap::new()))
1434 })
1435}
1436
1437fn is_rate_limited(server: &Server, ip: IpAddr) -> bool {
1438 let Some(limit) = server.rate_limit_per_minute else {
1439 return false;
1440 };
1441 let now = Instant::now();
1442 let shard = &rate_limit_table()[rate_limit_shard_index(ip)];
1446 let mut guard = match shard.lock() {
1447 Ok(guard) => guard,
1448 Err(poisoned) => poisoned.into_inner(),
1449 };
1450 let hits = guard.entry(ip).or_default();
1451 hits.retain(|timestamp| {
1452 now.duration_since(*timestamp) <= Duration::from_secs(60)
1453 });
1454 if hits.len() >= limit {
1455 return true;
1456 }
1457 hits.push(now);
1458 false
1459}
1460
1461fn generate_response(
1472 request: &Request,
1473 document_root: &Path,
1474) -> Result<Response, ServerError> {
1475 let canonical = fs::canonicalize(document_root)
1477 .unwrap_or_else(|_| document_root.to_path_buf());
1478 generate_response_with_cache(
1479 request,
1480 document_root,
1481 &canonical,
1482 None,
1483 DEFAULT_MAX_BUFFERED_BODY_BYTES,
1484 )
1485}
1486
1487fn generate_response_with_cache(
1488 request: &Request,
1489 document_root: &Path,
1490 canonical_root: &Path,
1491 cache_ttl_secs: Option<u64>,
1492 max_buffered_body_bytes: u64,
1493) -> Result<Response, ServerError> {
1494 let mut path = PathBuf::from(document_root);
1495 let request_path = request.path().trim_start_matches('/');
1496
1497 if request_path.is_empty() {
1498 path.push("index.html");
1500 } else {
1501 for component in request_path.split('/') {
1502 if component == ".." {
1503 let _ = path.pop();
1504 } else {
1505 path.push(component);
1506 }
1507 }
1508 }
1509
1510 let within_root = fs::canonicalize(&path)
1511 .map(|candidate| candidate.starts_with(canonical_root))
1512 .unwrap_or_else(|_| path.starts_with(document_root));
1513 if !within_root {
1514 return Err(ServerError::forbidden("Access denied"));
1515 }
1516
1517 if path.is_file() {
1518 serve_file_response(
1519 request,
1520 &path,
1521 cache_ttl_secs,
1522 max_buffered_body_bytes,
1523 )
1524 } else if path.is_dir() {
1525 path.push("index.html");
1527 if path.is_file() {
1528 serve_file_response(
1529 request,
1530 &path,
1531 cache_ttl_secs,
1532 max_buffered_body_bytes,
1533 )
1534 } else {
1535 generate_404_response(document_root)
1536 }
1537 } else {
1538 generate_404_response(document_root)
1539 }
1540}
1541
1542fn serve_file_response(
1543 request: &Request,
1544 path: &Path,
1545 cache_ttl_secs: Option<u64>,
1546 max_buffered_body_bytes: u64,
1547) -> Result<Response, ServerError> {
1548 let mut serving_path = path.to_path_buf();
1549 let mut content_encoding: Option<&'static str> = None;
1550 if let Some(encoding) = request.header("accept-encoding") {
1551 if encoding.contains("br") {
1552 let candidate =
1553 PathBuf::from(format!("{}.br", path.display()));
1554 if candidate.is_file() {
1555 serving_path = candidate;
1556 content_encoding = Some("br");
1557 }
1558 }
1559 if content_encoding.is_none()
1560 && (encoding.contains("zstd") || encoding.contains("zst"))
1561 {
1562 let candidate =
1563 PathBuf::from(format!("{}.zst", path.display()));
1564 if candidate.is_file() {
1565 serving_path = candidate;
1566 content_encoding = Some("zstd");
1567 }
1568 }
1569 if content_encoding.is_none() && encoding.contains("gzip") {
1570 let candidate =
1571 PathBuf::from(format!("{}.gz", path.display()));
1572 if candidate.is_file() {
1573 serving_path = candidate;
1574 content_encoding = Some("gzip");
1575 }
1576 }
1577 }
1578
1579 let serving_metadata =
1585 fs::metadata(&serving_path).map_err(ServerError::from)?;
1586 if serving_metadata.len() > max_buffered_body_bytes {
1587 return Err(ServerError::Custom(format!(
1588 "file exceeds in-memory serve cap ({} > {} bytes); \
1589 override via ServerBuilder::max_buffered_body_bytes or \
1590 wait for v0.1 streaming",
1591 serving_metadata.len(),
1592 max_buffered_body_bytes
1593 )));
1594 }
1595 let contents = fs::read(&serving_path)?;
1596 let metadata = fs::metadata(path)?;
1597 let etag = compute_etag(&metadata);
1598 if request
1599 .header("if-none-match")
1600 .is_some_and(|candidate| candidate == &*etag)
1601 {
1602 let mut response =
1603 Response::new(304, "NOT MODIFIED", Vec::new());
1604 response.add_header("ETag", &etag);
1605 return Ok(response);
1606 }
1607
1608 let content_type = get_content_type(path);
1609 let mut response = if let Some((start, end)) =
1610 parse_range_header(request.header("range"), contents.len())
1611 {
1612 let body = contents[start..=end].to_vec();
1613 let mut partial = Response::new(206, "PARTIAL CONTENT", body);
1614 partial.add_header(
1615 "Content-Range",
1616 &format!("bytes {}-{}/{}", start, end, contents.len()),
1617 );
1618 partial
1619 } else {
1620 Response::new(200, "OK", contents)
1621 };
1622
1623 response.add_header("Content-Type", content_type);
1624 response.add_header("ETag", &etag);
1625 response.add_header("Accept-Ranges", "bytes");
1626 if let Some(encoding) = content_encoding {
1627 response.add_header("Content-Encoding", encoding);
1628 response.add_header("Vary", "Accept-Encoding");
1629 }
1630 if let Some(ttl) = cache_ttl_secs {
1631 response.add_header(
1632 "Cache-Control",
1633 &format!("public, max-age={ttl}"),
1634 );
1635 }
1636 Ok(response)
1637}
1638
1639fn etag_cache() -> &'static EtagCache {
1640 ETAG_CACHE.get_or_init(|| RwLock::new(HashMap::new()))
1641}
1642
1643fn compute_etag(metadata: &fs::Metadata) -> Arc<str> {
1644 let modified = metadata
1645 .modified()
1646 .ok()
1647 .and_then(|time| time.duration_since(UNIX_EPOCH).ok())
1648 .map_or(0_u64, |duration| duration.as_secs());
1649 let len = metadata.len();
1650 let key = (len, modified);
1651
1652 let cache = etag_cache();
1653 if let Ok(read) = cache.read()
1654 && let Some(etag) = read.get(&key)
1655 {
1656 return Arc::clone(etag);
1657 }
1658
1659 let etag: Arc<str> =
1663 Arc::from(format!("W/\"{:x}-{:x}\"", len, modified));
1664
1665 if let Ok(mut write) = cache.write() {
1666 if write.len() >= ETAG_CACHE_MAX {
1667 let drop_count = ETAG_CACHE_MAX / 4;
1673 let to_remove: Vec<_> =
1674 write.keys().take(drop_count).copied().collect();
1675 for k in to_remove {
1676 let _ = write.remove(&k);
1677 }
1678 }
1679 let _ = write.insert(key, Arc::clone(&etag));
1680 }
1681
1682 etag
1683}
1684
1685fn parse_range_header(
1686 header: Option<&str>,
1687 total_len: usize,
1688) -> Option<(usize, usize)> {
1689 let header = header?;
1690 let value = header.strip_prefix("bytes=")?;
1691 let (start_str, end_str) = value.split_once('-')?;
1692 if start_str.is_empty() && end_str.is_empty() {
1693 return None;
1694 }
1695 if start_str.is_empty() {
1696 let suffix_len = end_str.parse::<usize>().ok()?;
1697 if suffix_len == 0 || suffix_len > total_len {
1698 return None;
1699 }
1700 return Some((total_len - suffix_len, total_len - 1));
1701 }
1702 let start = start_str.parse::<usize>().ok()?;
1703 let end = if end_str.is_empty() {
1704 total_len.checked_sub(1)?
1705 } else {
1706 end_str.parse::<usize>().ok()?
1707 };
1708 if start > end || end >= total_len {
1709 return None;
1710 }
1711 Some((start, end))
1712}
1713
1714fn generate_404_response(
1724 document_root: &Path,
1725) -> Result<Response, ServerError> {
1726 let not_found_path = document_root.join("404/index.html");
1727 let contents = if not_found_path.is_file() {
1728 fs::read(not_found_path)?
1729 } else {
1730 b"404 Not Found".to_vec()
1731 };
1732 let mut response = Response::new(404, "NOT FOUND", contents);
1733 response.add_header("Content-Type", "text/html");
1734 Ok(response)
1735}
1736
1737fn generate_head_response(
1752 request: &Request,
1753 document_root: &Path,
1754) -> Result<Response, ServerError> {
1755 let full_response = generate_response(request, document_root)?;
1757
1758 let mut head_response = Response::new(
1760 full_response.status_code,
1761 &full_response.status_text,
1762 Vec::new(), );
1764
1765 for (name, value) in &full_response.headers {
1767 head_response.add_header(name, value);
1768 }
1769
1770 let content_length = full_response.body.len().to_string();
1772 head_response.add_header("Content-Length", &content_length);
1773
1774 Ok(head_response)
1775}
1776
1777fn generate_options_response(
1790 _request: &Request,
1791) -> Result<Response, ServerError> {
1792 let mut response = Response::new(200, "OK", Vec::new());
1793 response.add_header("Allow", "GET, HEAD, OPTIONS");
1794 response.add_header("Content-Length", "0");
1795 Ok(response)
1796}
1797
1798fn generate_method_not_allowed_response() -> Response {
1807 let mut response = Response::new(
1808 405,
1809 "METHOD NOT ALLOWED",
1810 b"Method Not Allowed".to_vec(),
1811 );
1812 response.add_header("Allow", "GET, HEAD, OPTIONS");
1813 response.add_header("Content-Type", "text/plain");
1814 response.add_header("Content-Length", "18");
1815 response
1816}
1817
1818fn response_from_error(
1819 error: &ServerError,
1820 document_root: &Path,
1821) -> Response {
1822 match error {
1823 ServerError::InvalidRequest(message) => {
1824 let mut response = Response::new(
1825 400,
1826 "BAD REQUEST",
1827 message.as_bytes().to_vec(),
1828 );
1829 response.add_header("Content-Type", "text/plain");
1830 response
1831 }
1832 ServerError::Forbidden(message) => {
1833 let mut response = Response::new(
1834 403,
1835 "FORBIDDEN",
1836 message.as_bytes().to_vec(),
1837 );
1838 response.add_header("Content-Type", "text/plain");
1839 response
1840 }
1841 ServerError::NotFound(_) => {
1842 generate_404_response(document_root).unwrap_or_else(|_| {
1843 let mut response = Response::new(
1844 404,
1845 "NOT FOUND",
1846 b"404 Not Found".to_vec(),
1847 );
1848 response.add_header("Content-Type", "text/plain");
1849 response
1850 })
1851 }
1852 ServerError::Io(_)
1853 | ServerError::Custom(_)
1854 | ServerError::TaskFailed(_) => {
1855 let mut response = Response::new(
1856 500,
1857 "INTERNAL SERVER ERROR",
1858 b"Internal Server Error".to_vec(),
1859 );
1860 response.add_header("Content-Type", "text/plain");
1861 response
1862 }
1863 }
1864}
1865
1866fn apply_response_policies(
1867 mut response: Response,
1868 server: &Server,
1869 request: &Request,
1870) -> Response {
1871 if let Some(headers) = server.custom_headers.as_ref() {
1872 for (name, value) in headers {
1873 response.add_header(name, value);
1874 }
1875 }
1876
1877 if server.cors_enabled.unwrap_or(false) {
1878 let allow_origin = server
1879 .cors_origins
1880 .as_ref()
1881 .and_then(|origins| origins.first())
1882 .map(String::as_str)
1883 .unwrap_or("*");
1884 response
1885 .add_header("Access-Control-Allow-Origin", allow_origin);
1886 response.add_header(
1887 "Access-Control-Allow-Methods",
1888 "GET, HEAD, OPTIONS",
1889 );
1890 response.add_header("Access-Control-Allow-Headers", "*");
1891
1892 if request.method().eq_ignore_ascii_case("OPTIONS") {
1893 response.add_header("Access-Control-Max-Age", "600");
1894 }
1895 }
1896
1897 if let Some(ttl) = server.static_cache_ttl_secs {
1898 let has_cache_control =
1899 response.headers.iter().any(|(name, _)| {
1900 name.eq_ignore_ascii_case("cache-control")
1901 });
1902 if !has_cache_control {
1903 if is_probably_immutable_asset_path(request.path()) {
1904 response.add_header(
1905 "Cache-Control",
1906 "public, max-age=31536000, immutable",
1907 );
1908 } else {
1909 response.add_header(
1910 "Cache-Control",
1911 &format!("public, max-age={ttl}"),
1912 );
1913 }
1914 }
1915 }
1916
1917 response
1918}
1919
1920fn is_probably_immutable_asset_path(path: &str) -> bool {
1921 let file = path.rsplit('/').next().unwrap_or(path);
1922 let Some((stem, _ext)) = file.rsplit_once('.') else {
1923 return false;
1924 };
1925 let Some(hash) = stem.rsplit('-').next() else {
1926 return false;
1927 };
1928 hash.len() >= 8 && hash.chars().all(|c| c.is_ascii_hexdigit())
1929}
1930
1931fn get_content_type(path: &Path) -> &'static str {
1941 match path.extension().and_then(std::ffi::OsStr::to_str) {
1942 Some("html") | Some("htm") => "text/html",
1944 Some("css") => "text/css",
1945 Some("js") | Some("mjs") => "application/javascript",
1946 Some("ts") => "application/typescript",
1947 Some("json") => "application/json",
1948 Some("xml") => "application/xml",
1949 Some("txt") => "text/plain",
1950 Some("md") | Some("markdown") => "text/markdown",
1951 Some("yaml") | Some("yml") => "application/x-yaml",
1952 Some("toml") => "application/toml",
1953
1954 Some("png") => "image/png",
1956 Some("jpg") | Some("jpeg") => "image/jpeg",
1957 Some("gif") => "image/gif",
1958 Some("svg") => "image/svg+xml",
1959 Some("ico") => "image/x-icon",
1960
1961 Some("webp") => "image/webp",
1963 Some("avif") => "image/avif",
1964 Some("heic") | Some("heif") => "image/heic",
1965 Some("jxl") => "image/jxl",
1966 Some("bmp") => "image/bmp",
1967 Some("tiff") | Some("tif") => "image/tiff",
1968
1969 Some("wasm") => "application/wasm",
1971
1972 Some("woff") => "font/woff",
1974 Some("woff2") => "font/woff2",
1975 Some("ttf") => "font/ttf",
1976 Some("otf") => "font/otf",
1977 Some("eot") => "application/vnd.ms-fontobject",
1978
1979 Some("mp3") => "audio/mpeg",
1981 Some("wav") => "audio/wav",
1982 Some("ogg") => "audio/ogg",
1983 Some("opus") => "audio/opus",
1984 Some("flac") => "audio/flac",
1985 Some("m4a") => "audio/mp4",
1986 Some("aac") => "audio/aac",
1987
1988 Some("mp4") => "video/mp4",
1990 Some("webm") => "video/webm",
1991 Some("av1") => "video/av1",
1992 Some("avi") => "video/x-msvideo",
1993 Some("mov") => "video/quicktime",
1994
1995 Some("pdf") => "application/pdf",
1997 Some("zip") => "application/zip",
1998 Some("tar") => "application/x-tar",
1999 Some("gz") => "application/gzip",
2000
2001 Some("map") => "application/json", Some("webmanifest") => "application/manifest+json",
2004
2005 _ => "application/octet-stream",
2007 }
2008}
2009
2010#[cfg(test)]
2011mod tests {
2012 use super::*;
2013 use std::fs::File;
2014 use std::io;
2015 use std::io::Read;
2016 use std::io::Write;
2017 use std::net::{TcpListener, TcpStream};
2018 use tempfile::TempDir;
2019
2020 fn setup_test_directory() -> TempDir {
2021 let temp_dir = TempDir::new().unwrap();
2022 let root_path = temp_dir.path();
2023
2024 let mut index_file =
2026 File::create(root_path.join("index.html")).unwrap();
2027 index_file
2028 .write_all(b"<html><body>Hello, World!</body></html>")
2029 .unwrap();
2030
2031 fs::create_dir(root_path.join("404")).unwrap();
2033 let mut not_found_file =
2034 File::create(root_path.join("404/index.html")).unwrap();
2035 not_found_file
2036 .write_all(b"<html><body>404 Not Found</body></html>")
2037 .unwrap();
2038
2039 fs::create_dir(root_path.join("subdir")).unwrap();
2041 let mut subdir_index_file =
2042 File::create(root_path.join("subdir/index.html")).unwrap();
2043 subdir_index_file
2044 .write_all(b"<html><body>Subdirectory Index</body></html>")
2045 .unwrap();
2046
2047 temp_dir
2048 }
2049
2050 fn roundtrip_handle_connection(
2051 server: &Server,
2052 request: &[u8],
2053 ) -> String {
2054 let listener = TcpListener::bind("127.0.0.1:0").expect("bind");
2055 let addr = listener.local_addr().expect("addr");
2056 let server_clone = server.clone();
2057 let handle = thread::spawn(move || {
2058 let (stream, _) = listener.accept().expect("accept");
2059 handle_connection(stream, &server_clone).expect("handle");
2060 });
2061
2062 let mut client = TcpStream::connect(addr).expect("connect");
2063 let request_text = std::str::from_utf8(request).unwrap_or("");
2070 if request_text.to_ascii_lowercase().contains("connection:") {
2071 client.write_all(request).expect("write");
2072 } else {
2073 let with_close =
2076 if let Some(idx) = request_text.rfind("\r\n\r\n") {
2077 let (head, tail) = request_text.split_at(idx);
2078 format!("{head}\r\nConnection: close{tail}")
2079 } else {
2080 request_text.to_string()
2081 };
2082 client.write_all(with_close.as_bytes()).expect("write");
2083 }
2084 let mut response = String::new();
2085 let _ = client.read_to_string(&mut response).expect("read");
2086 handle.join().expect("join");
2087 response
2088 }
2089
2090 fn connected_pair() -> (TcpStream, TcpStream) {
2091 let listener = TcpListener::bind("127.0.0.1:0").expect("bind");
2092 let addr = listener.local_addr().expect("addr");
2093 let client = TcpStream::connect(addr).expect("connect");
2094 let (server, _) = listener.accept().expect("accept");
2095 (server, client)
2096 }
2097
2098 #[test]
2099 fn test_server_creation() {
2100 let server = Server::new("127.0.0.1:8080", "/var/www");
2101 assert_eq!(server.address, "127.0.0.1:8080");
2102 assert_eq!(server.document_root, PathBuf::from("/var/www"));
2103 }
2104
2105 #[test]
2106 fn test_get_content_type() {
2107 assert_eq!(
2108 get_content_type(Path::new("test.html")),
2109 "text/html"
2110 );
2111 assert_eq!(
2112 get_content_type(Path::new("page.htm")),
2113 "text/html"
2114 );
2115 assert_eq!(
2116 get_content_type(Path::new("style.css")),
2117 "text/css"
2118 );
2119 assert_eq!(
2120 get_content_type(Path::new("script.js")),
2121 "application/javascript"
2122 );
2123 assert_eq!(
2124 get_content_type(Path::new("data.json")),
2125 "application/json"
2126 );
2127 assert_eq!(
2128 get_content_type(Path::new("image.png")),
2129 "image/png"
2130 );
2131 assert_eq!(
2132 get_content_type(Path::new("photo.jpg")),
2133 "image/jpeg"
2134 );
2135 assert_eq!(
2136 get_content_type(Path::new("animation.gif")),
2137 "image/gif"
2138 );
2139 assert_eq!(
2140 get_content_type(Path::new("icon.svg")),
2141 "image/svg+xml"
2142 );
2143 assert_eq!(
2144 get_content_type(Path::new("unknown.xyz")),
2145 "application/octet-stream"
2146 );
2147 }
2148
2149 #[test]
2150 fn test_modern_content_types() {
2151 assert_eq!(
2153 get_content_type(Path::new("image.webp")),
2154 "image/webp"
2155 );
2156 assert_eq!(
2157 get_content_type(Path::new("image.avif")),
2158 "image/avif"
2159 );
2160 assert_eq!(
2161 get_content_type(Path::new("image.heic")),
2162 "image/heic"
2163 );
2164 assert_eq!(
2165 get_content_type(Path::new("image.heif")),
2166 "image/heic"
2167 );
2168 assert_eq!(
2169 get_content_type(Path::new("image.jxl")),
2170 "image/jxl"
2171 );
2172
2173 assert_eq!(
2175 get_content_type(Path::new("module.wasm")),
2176 "application/wasm"
2177 );
2178
2179 assert_eq!(
2181 get_content_type(Path::new("script.ts")),
2182 "application/typescript"
2183 );
2184 assert_eq!(
2185 get_content_type(Path::new("module.mjs")),
2186 "application/javascript"
2187 );
2188 assert_eq!(
2189 get_content_type(Path::new("README.md")),
2190 "text/markdown"
2191 );
2192 assert_eq!(
2193 get_content_type(Path::new("config.yaml")),
2194 "application/x-yaml"
2195 );
2196 assert_eq!(
2197 get_content_type(Path::new("config.yml")),
2198 "application/x-yaml"
2199 );
2200 assert_eq!(
2201 get_content_type(Path::new("Cargo.toml")),
2202 "application/toml"
2203 );
2204
2205 assert_eq!(
2207 get_content_type(Path::new("audio.opus")),
2208 "audio/opus"
2209 );
2210 assert_eq!(
2211 get_content_type(Path::new("audio.flac")),
2212 "audio/flac"
2213 );
2214
2215 assert_eq!(
2217 get_content_type(Path::new("video.av1")),
2218 "video/av1"
2219 );
2220
2221 assert_eq!(
2223 get_content_type(Path::new("script.js.map")),
2224 "application/json"
2225 );
2226 assert_eq!(
2227 get_content_type(Path::new("manifest.webmanifest")),
2228 "application/manifest+json"
2229 );
2230 }
2231
2232 #[test]
2233 fn test_generate_response() {
2234 let temp_dir = setup_test_directory();
2235 let document_root = temp_dir.path();
2236
2237 let root_request = Request {
2239 method: "GET".to_string(),
2240 path: "/".to_string(),
2241 version: "HTTP/1.1".to_string(),
2242 headers: Vec::new(),
2243 };
2244
2245 let root_response =
2246 generate_response(&root_request, document_root).unwrap();
2247 assert_eq!(root_response.status_code, 200);
2248 assert_eq!(root_response.status_text, "OK");
2249 assert!(
2250 root_response.body.starts_with(
2251 b"<html><body>Hello, World!</body></html>"
2252 )
2253 );
2254
2255 let file_request = Request {
2257 method: "GET".to_string(),
2258 path: "/index.html".to_string(),
2259 version: "HTTP/1.1".to_string(),
2260 headers: Vec::new(),
2261 };
2262
2263 let file_response =
2264 generate_response(&file_request, document_root).unwrap();
2265 assert_eq!(file_response.status_code, 200);
2266 assert_eq!(file_response.status_text, "OK");
2267 assert!(
2268 file_response.body.starts_with(
2269 b"<html><body>Hello, World!</body></html>"
2270 )
2271 );
2272
2273 let subdir_request = Request {
2275 method: "GET".to_string(),
2276 path: "/subdir/".to_string(),
2277 version: "HTTP/1.1".to_string(),
2278 headers: Vec::new(),
2279 };
2280
2281 let subdir_response =
2282 generate_response(&subdir_request, document_root).unwrap();
2283 assert_eq!(subdir_response.status_code, 200);
2284 assert_eq!(subdir_response.status_text, "OK");
2285 assert!(subdir_response.body.starts_with(
2286 b"<html><body>Subdirectory Index</body></html>"
2287 ));
2288
2289 let not_found_request = Request {
2291 method: "GET".to_string(),
2292 path: "/nonexistent.html".to_string(),
2293 version: "HTTP/1.1".to_string(),
2294 headers: Vec::new(),
2295 };
2296
2297 let not_found_response =
2298 generate_response(¬_found_request, document_root)
2299 .unwrap();
2300 assert_eq!(not_found_response.status_code, 404);
2301 assert_eq!(not_found_response.status_text, "NOT FOUND");
2302 assert!(
2303 not_found_response.body.starts_with(
2304 b"<html><body>404 Not Found</body></html>"
2305 )
2306 );
2307
2308 let traversal_request = Request {
2310 method: "GET".to_string(),
2311 path: "/../outside.html".to_string(),
2312 version: "HTTP/1.1".to_string(),
2313 headers: Vec::new(),
2314 };
2315
2316 let traversal_response =
2317 generate_response(&traversal_request, document_root);
2318 assert!(matches!(
2319 traversal_response,
2320 Err(ServerError::Forbidden(_))
2321 ));
2322 }
2323
2324 #[test]
2325 fn test_server_builder() {
2326 let server = Server::builder()
2328 .address("127.0.0.1:8080")
2329 .document_root("/var/www")
2330 .enable_cors()
2331 .custom_header("X-Custom", "test-value")
2332 .request_timeout(Duration::from_secs(30))
2333 .build()
2334 .unwrap();
2335
2336 assert_eq!(server.address, "127.0.0.1:8080");
2337 assert_eq!(server.document_root, PathBuf::from("/var/www"));
2338 assert_eq!(server.cors_enabled, Some(true));
2339 assert_eq!(
2340 server.request_timeout,
2341 Some(Duration::from_secs(30))
2342 );
2343
2344 let headers = server.custom_headers.unwrap();
2346 assert_eq!(
2347 headers.get("X-Custom"),
2348 Some(&"test-value".to_string())
2349 );
2350
2351 let result = ServerBuilder::new().build();
2353 assert!(result.is_err());
2354
2355 let server_with_origins = Server::builder()
2357 .address("127.0.0.1:9000")
2358 .document_root("/tmp")
2359 .cors_origins(vec!["https://example.com".to_string()])
2360 .build()
2361 .unwrap();
2362
2363 assert_eq!(server_with_origins.cors_enabled, Some(true));
2364 assert_eq!(
2365 server_with_origins.cors_origins,
2366 Some(vec!["https://example.com".to_string()])
2367 );
2368 }
2369
2370 #[test]
2371 fn test_graceful_shutdown() {
2372 let shutdown = ShutdownSignal::new(Duration::from_secs(5));
2374
2375 assert!(!shutdown.is_shutdown_requested());
2377 assert_eq!(shutdown.active_connection_count(), 0);
2378
2379 shutdown.connection_started();
2381 assert_eq!(shutdown.active_connection_count(), 1);
2382
2383 shutdown.connection_started();
2384 assert_eq!(shutdown.active_connection_count(), 2);
2385
2386 shutdown.connection_finished();
2387 assert_eq!(shutdown.active_connection_count(), 1);
2388
2389 shutdown.connection_finished();
2390 assert_eq!(shutdown.active_connection_count(), 0);
2391
2392 shutdown.shutdown();
2394 assert!(shutdown.is_shutdown_requested());
2395
2396 let graceful = shutdown.wait_for_shutdown();
2398 assert!(graceful);
2399 }
2400
2401 #[test]
2402 fn test_shutdown_signal_timeout() {
2403 let shutdown = ShutdownSignal::new(Duration::from_millis(100));
2404
2405 shutdown.connection_started();
2407 shutdown.shutdown();
2408
2409 let graceful = shutdown.wait_for_shutdown();
2411 assert!(!graceful); }
2413
2414 #[test]
2415 fn test_thread_pool() {
2416 use std::sync::Arc;
2417 use std::sync::atomic::AtomicUsize;
2418 use std::sync::mpsc;
2419
2420 let pool = ThreadPool::new(4);
2421 let counter = Arc::new(AtomicUsize::new(0));
2422 let (tx, rx) = mpsc::channel();
2423
2424 for _ in 0..10 {
2426 let counter_clone = Arc::clone(&counter);
2427 let tx_clone = tx.clone();
2428
2429 pool.execute(move || {
2430 let _ = counter_clone.fetch_add(1, Ordering::SeqCst);
2431 tx_clone.send(()).unwrap();
2432 });
2433 }
2434
2435 for _ in 0..10 {
2437 rx.recv().unwrap();
2438 }
2439
2440 assert_eq!(counter.load(Ordering::SeqCst), 10);
2441 }
2442
2443 #[test]
2444 fn test_connection_pool() {
2445 let pool = ConnectionPool::new(2);
2446 assert_eq!(pool.active_count(), 0);
2447
2448 let guard1 = pool.acquire().unwrap();
2450 assert_eq!(pool.active_count(), 1);
2451
2452 let guard2 = pool.acquire().unwrap();
2454 assert_eq!(pool.active_count(), 2);
2455
2456 let result = pool.acquire();
2458 assert!(result.is_err());
2459 assert_eq!(pool.active_count(), 2);
2460
2461 drop(guard1);
2463 assert_eq!(pool.active_count(), 1);
2464
2465 let _guard3 = pool.acquire().unwrap();
2467 assert_eq!(pool.active_count(), 2);
2468
2469 drop(guard2);
2471 drop(_guard3);
2472 assert_eq!(pool.active_count(), 0);
2473 }
2474
2475 #[test]
2476 fn test_thread_pool_concurrent_execution() {
2477 use std::sync::Arc;
2478 use std::sync::atomic::AtomicUsize;
2479 use std::sync::mpsc;
2480 use std::time::Instant;
2481
2482 let pool = ThreadPool::new(4);
2483 let counter = Arc::new(AtomicUsize::new(0));
2484 let (tx, rx) = mpsc::channel();
2485
2486 let start_time = Instant::now();
2487
2488 for i in 0..100 {
2490 let counter_clone = Arc::clone(&counter);
2491 let tx_clone = tx.clone();
2492
2493 pool.execute(move || {
2494 thread::sleep(Duration::from_millis(10));
2496 let _ = counter_clone.fetch_add(1, Ordering::SeqCst);
2497 tx_clone.send(i).unwrap();
2498 });
2499 }
2500
2501 for _ in 0..100 {
2503 let _ = rx.recv().unwrap();
2504 }
2505
2506 let elapsed = start_time.elapsed();
2507 assert_eq!(counter.load(Ordering::SeqCst), 100);
2508
2509 assert!(
2511 elapsed.as_millis() < 800,
2512 "Thread pool should provide concurrency benefits"
2513 );
2514 }
2515
2516 #[test]
2517 fn test_connection_pool_backpressure() {
2518 let pool = ConnectionPool::new(2);
2519
2520 let _guard1 = pool.acquire().unwrap();
2522 let _guard2 = pool.acquire().unwrap();
2523 assert_eq!(pool.active_count(), 2);
2524
2525 let result = pool.acquire();
2527 assert!(result.is_err());
2528 assert_eq!(
2529 result.unwrap_err().kind(),
2530 io::ErrorKind::WouldBlock
2531 );
2532 }
2533
2534 #[test]
2535 fn test_service_unavailable_response() {
2536 let listener = TcpListener::bind("127.0.0.1:0").unwrap();
2538 let addr = listener.local_addr().unwrap();
2539
2540 let _ = thread::spawn(move || {
2541 let (stream, _) = listener.accept().unwrap();
2542 send_service_unavailable(stream).unwrap();
2543 });
2544
2545 let mut client_stream = TcpStream::connect(addr).unwrap();
2546 let mut response = String::new();
2547 let _ = client_stream.read_to_string(&mut response).unwrap();
2548
2549 assert!(response.contains("503"));
2550 assert!(response.contains("SERVICE UNAVAILABLE"));
2551 assert!(response.contains("Service temporarily unavailable"));
2552 assert!(response.contains("Retry-After: 1"));
2553 }
2554
2555 #[test]
2556 fn test_service_unavailable_send_failure_is_mapped() {
2557 use std::net::Shutdown;
2558
2559 let listener = TcpListener::bind("127.0.0.1:0").expect("bind");
2560 let addr = listener.local_addr().expect("addr");
2561
2562 let t = thread::spawn(move || {
2563 let (stream, _) = listener.accept().expect("accept");
2564 stream.shutdown(Shutdown::Write).expect("shutdown");
2565 let err =
2566 send_service_unavailable(stream).expect_err("err");
2567 assert!(
2568 err.to_string().contains("Failed to send response")
2569 );
2570 });
2571
2572 let _client = TcpStream::connect(addr).expect("connect");
2573 t.join().expect("join");
2574 }
2575
2576 #[test]
2577 fn test_response_from_error_variants() {
2578 let temp_dir = setup_test_directory();
2579 let root = temp_dir.path();
2580
2581 let bad = response_from_error(
2582 &ServerError::InvalidRequest("bad".to_string()),
2583 root,
2584 );
2585 assert_eq!(bad.status_code, 400);
2586
2587 let forbidden = response_from_error(
2588 &ServerError::Forbidden("no".to_string()),
2589 root,
2590 );
2591 assert_eq!(forbidden.status_code, 403);
2592
2593 let not_found = response_from_error(
2594 &ServerError::NotFound("missing".to_string()),
2595 root,
2596 );
2597 assert_eq!(not_found.status_code, 404);
2598
2599 let internal = response_from_error(
2600 &ServerError::TaskFailed("boom".to_string()),
2601 root,
2602 );
2603 assert_eq!(internal.status_code, 500);
2604 }
2605
2606 #[test]
2607 fn test_apply_response_policies_with_cors_and_headers() {
2608 let mut headers = HashMap::new();
2609 let _ = headers
2610 .insert("X-App".to_string(), "http-handle".to_string());
2611 let server = Server::builder()
2612 .address("127.0.0.1:0")
2613 .document_root(".")
2614 .enable_cors()
2615 .cors_origins(vec!["https://example.com".to_string()])
2616 .custom_headers(headers)
2617 .build()
2618 .expect("server builder");
2619
2620 let request = Request {
2621 method: "OPTIONS".to_string(),
2622 path: "/".to_string(),
2623 version: "HTTP/1.1".to_string(),
2624 headers: Vec::new(),
2625 };
2626 let response = apply_response_policies(
2627 Response::new(200, "OK", Vec::new()),
2628 &server,
2629 &request,
2630 );
2631
2632 let has_origin = response.headers.iter().any(|(k, v)| {
2633 k.eq_ignore_ascii_case("Access-Control-Allow-Origin")
2634 && v == "https://example.com"
2635 });
2636 let has_custom = response
2637 .headers
2638 .iter()
2639 .any(|(k, v)| k == "X-App" && v == "http-handle");
2640 let has_max_age = response.headers.iter().any(|(k, _)| {
2641 k.eq_ignore_ascii_case("Access-Control-Max-Age")
2642 });
2643
2644 assert!(has_origin);
2645 assert!(has_custom);
2646 assert!(has_max_age);
2647 }
2648
2649 #[test]
2650 fn test_thread_pool_debug_representation() {
2651 let pool = ThreadPool::new(1);
2652 let rendered = format!("{pool:?}");
2653 assert!(rendered.contains("ThreadPool"));
2654 assert!(rendered.contains("<Sender<Job>>"));
2655 }
2656
2657 #[test]
2658 fn test_server_getters_expose_builder_config() {
2659 let mut headers = HashMap::new();
2660 let _ =
2661 headers.insert("X-Test".to_string(), "value".to_string());
2662
2663 let server = Server::builder()
2664 .address("127.0.0.1:9001")
2665 .document_root("/tmp")
2666 .enable_cors()
2667 .cors_origins(vec!["https://example.com".to_string()])
2668 .custom_headers(headers)
2669 .request_timeout(Duration::from_secs(5))
2670 .connection_timeout(Duration::from_secs(7))
2671 .build()
2672 .expect("server");
2673
2674 assert_eq!(server.cors_enabled(), Some(true));
2675 assert_eq!(
2676 server.cors_origins(),
2677 &Some(vec!["https://example.com".to_string()])
2678 );
2679 assert_eq!(
2680 server.request_timeout(),
2681 Some(Duration::from_secs(5))
2682 );
2683 assert_eq!(
2684 server.connection_timeout(),
2685 Some(Duration::from_secs(7))
2686 );
2687 assert_eq!(server.address(), "127.0.0.1:9001");
2688 assert_eq!(server.document_root(), &PathBuf::from("/tmp"));
2689 assert_eq!(
2690 server
2691 .custom_headers()
2692 .as_ref()
2693 .and_then(|h| h.get("X-Test")),
2694 Some(&"value".to_string())
2695 );
2696 }
2697
2698 #[test]
2699 fn test_start_variants_return_bind_errors_for_in_use_address() {
2700 let occupied = TcpListener::bind("127.0.0.1:0").expect("bind");
2701 let addr = occupied.local_addr().expect("addr").to_string();
2702 let server = Server::new(&addr, ".");
2703
2704 assert!(server.start().is_err());
2705 assert!(
2706 server
2707 .start_with_graceful_shutdown(Duration::from_millis(1))
2708 .is_err()
2709 );
2710 assert!(server.start_with_thread_pool(1).is_err());
2711 assert!(server.start_with_pooling(1, 1).is_err());
2712 }
2713
2714 #[test]
2715 fn test_start_with_shutdown_signal_and_ready_reports_bound_address()
2716 {
2717 let root = setup_test_directory();
2718 let server = Server::builder()
2719 .address("127.0.0.1:0")
2720 .document_root(root.path().to_str().expect("path"))
2721 .build()
2722 .expect("server");
2723
2724 let (ready_tx, ready_rx) = mpsc::channel::<String>();
2725 let shutdown =
2726 Arc::new(ShutdownSignal::new(Duration::from_secs(2)));
2727 let shutdown_for_server = shutdown.clone();
2728 let server_for_thread = server.clone();
2729
2730 let handle = thread::spawn(move || {
2731 server_for_thread
2732 .start_with_shutdown_signal_and_ready(
2733 shutdown_for_server,
2734 move |addr| {
2735 let _ = ready_tx.send(addr);
2736 },
2737 )
2738 .expect("server run");
2739 });
2740
2741 let bound_addr = ready_rx
2742 .recv_timeout(Duration::from_secs(2))
2743 .expect("bound address");
2744 assert!(bound_addr.starts_with("127.0.0.1:"));
2745 assert_ne!(bound_addr, "127.0.0.1:0");
2746
2747 let mut stream =
2748 TcpStream::connect(&bound_addr).expect("connect");
2749 stream
2750 .write_all(
2751 b"GET /index.html HTTP/1.1\r\nHost: localhost\r\n\r\n",
2752 )
2753 .expect("write");
2754 let mut response = String::new();
2755 let _ = stream.read_to_string(&mut response);
2756 assert!(response.starts_with("HTTP/1.1 200 OK"));
2757
2758 shutdown.shutdown();
2759 handle.join().expect("join");
2760 }
2761
2762 #[test]
2763 fn test_generate_response_falls_back_to_builtin_404_without_page() {
2764 let temp_dir = TempDir::new().expect("tmp");
2765 fs::write(temp_dir.path().join("index.html"), b"index")
2766 .expect("write");
2767 fs::create_dir(temp_dir.path().join("empty-dir")).expect("dir");
2768
2769 let request = Request {
2770 method: "GET".to_string(),
2771 path: "/empty-dir/".to_string(),
2772 version: "HTTP/1.1".to_string(),
2773 headers: Vec::new(),
2774 };
2775
2776 let response = generate_response(&request, temp_dir.path())
2777 .expect("response");
2778 assert_eq!(response.status_code, 404);
2779 assert_eq!(response.body, b"404 Not Found".to_vec());
2780 }
2781
2782 #[cfg(unix)]
2783 #[test]
2784 fn test_response_from_error_not_found_fallback_when_custom_404_unreadable()
2785 {
2786 use std::os::unix::fs::PermissionsExt;
2787
2788 let temp_dir = TempDir::new().expect("tmp");
2789 let custom_404_dir = temp_dir.path().join("404");
2790 fs::create_dir(&custom_404_dir).expect("create 404 dir");
2791 let custom_404 = custom_404_dir.join("index.html");
2792 fs::write(&custom_404, b"custom").expect("write 404");
2793 fs::set_permissions(
2794 &custom_404,
2795 fs::Permissions::from_mode(0o000),
2796 )
2797 .expect("chmod");
2798
2799 let response = response_from_error(
2800 &ServerError::NotFound("missing".to_string()),
2801 temp_dir.path(),
2802 );
2803
2804 assert_eq!(response.status_code, 404);
2805 assert_eq!(response.status_text, "NOT FOUND");
2806 assert_eq!(response.body, b"404 Not Found".to_vec());
2807 }
2808
2809 #[test]
2810 fn test_handle_connection_options_and_parse_error_paths() {
2811 let root = setup_test_directory();
2812 let root_str = root.path().to_str().expect("root path");
2813 let server = Server::builder()
2814 .address("127.0.0.1:0")
2815 .document_root(root_str)
2816 .build()
2817 .expect("server");
2818
2819 let options_response = roundtrip_handle_connection(
2820 &server,
2821 b"OPTIONS / HTTP/1.1\r\nHost: localhost\r\n\r\n",
2822 );
2823 assert!(options_response.starts_with("HTTP/1.1 200 OK"));
2824 assert!(options_response.contains("Allow: GET, HEAD, OPTIONS"));
2825
2826 let head_response = roundtrip_handle_connection(
2827 &server,
2828 b"HEAD / HTTP/1.1\r\nHost: localhost\r\n\r\n",
2829 );
2830 assert!(head_response.starts_with("HTTP/1.1 200 OK"));
2831 assert!(head_response.contains("Content-Length:"));
2832
2833 let method_not_allowed = roundtrip_handle_connection(
2834 &server,
2835 b"POST / HTTP/1.1\r\nHost: localhost\r\n\r\n",
2836 );
2837 assert!(
2838 method_not_allowed
2839 .starts_with("HTTP/1.1 405 METHOD NOT ALLOWED")
2840 );
2841
2842 let traversal_response = roundtrip_handle_connection(
2843 &server,
2844 b"GET /../outside HTTP/1.1\r\nHost: localhost\r\n\r\n",
2845 );
2846 assert!(
2847 traversal_response.starts_with("HTTP/1.1 403 FORBIDDEN")
2848 );
2849
2850 let bad_response =
2851 roundtrip_handle_connection(&server, b"BAD\r\n\r\n");
2852 assert!(bad_response.starts_with("HTTP/1.1 400 BAD REQUEST"));
2853 }
2854
2855 #[test]
2856 fn test_generate_response_supports_etag_and_range() {
2857 let temp_dir = setup_test_directory();
2858 let root = temp_dir.path();
2859
2860 let headers =
2861 vec![("range".to_string(), "bytes=0-4".to_string())];
2862 let range_request = Request {
2863 method: "GET".to_string(),
2864 path: "/index.html".to_string(),
2865 version: "HTTP/1.1".to_string(),
2866 headers,
2867 };
2868 let range_response =
2869 generate_response(&range_request, root).expect("range");
2870 assert_eq!(range_response.status_code, 206);
2871 assert!(range_response.body.starts_with(b"<html"));
2872 let etag = range_response
2873 .headers
2874 .iter()
2875 .find(|(name, _)| name.eq_ignore_ascii_case("etag"))
2876 .map(|(_, value)| value.clone())
2877 .expect("etag");
2878
2879 let headers = vec![("if-none-match".to_string(), etag)];
2880 let conditional_request = Request {
2881 method: "GET".to_string(),
2882 path: "/index.html".to_string(),
2883 version: "HTTP/1.1".to_string(),
2884 headers,
2885 };
2886 let conditional_response =
2887 generate_response(&conditional_request, root)
2888 .expect("conditional");
2889 assert_eq!(conditional_response.status_code, 304);
2890 assert!(conditional_response.body.is_empty());
2891 }
2892
2893 #[test]
2894 fn test_metrics_and_rate_limiting() {
2895 let root = setup_test_directory();
2896 let server = Server::builder()
2897 .address("127.0.0.1:0")
2898 .document_root(root.path().to_str().expect("path"))
2899 .rate_limit_per_minute(1)
2900 .build()
2901 .expect("server");
2902
2903 let _ = roundtrip_handle_connection(
2904 &server,
2905 b"GET /index.html HTTP/1.1\r\nHost: localhost\r\n\r\n",
2906 );
2907 let limited = roundtrip_handle_connection(
2908 &server,
2909 b"GET /index.html HTTP/1.1\r\nHost: localhost\r\n\r\n",
2910 );
2911 assert!(limited.starts_with("HTTP/1.1 429 TOO MANY REQUESTS"));
2912
2913 let metrics = roundtrip_handle_connection(
2914 &server,
2915 b"GET /metrics HTTP/1.1\r\nHost: localhost\r\n\r\n",
2916 );
2917 assert!(metrics.starts_with("HTTP/1.1 200 OK"));
2918 assert!(metrics.contains("http_handle_requests_total"));
2919 }
2920
2921 #[test]
2922 fn test_trigger_shutdown_from_slot_helper() {
2923 let shutdown =
2924 Arc::new(ShutdownSignal::new(Duration::from_secs(1)));
2925 let slot = Mutex::new(Some(shutdown.clone()));
2926 assert!(!shutdown.is_shutdown_requested());
2927 Server::trigger_shutdown_from_slot(&slot);
2928 assert!(shutdown.is_shutdown_requested());
2929 }
2930
2931 #[test]
2932 fn test_handle_shutdown_signal_helper() {
2933 let shutdown =
2934 Arc::new(ShutdownSignal::new(Duration::from_secs(1)));
2935 let slot =
2936 SHUTDOWN_SIGNAL_SLOT.get_or_init(|| Mutex::new(None));
2937 if let Ok(mut guard) = slot.lock() {
2938 *guard = Some(shutdown.clone());
2939 }
2940 Server::handle_shutdown_signal();
2941 assert!(shutdown.is_shutdown_requested());
2942 }
2943
2944 #[test]
2945 fn test_accept_loop_helpers_cover_ok_and_err_paths() {
2946 let root = setup_test_directory();
2947 let server = Server::builder()
2948 .address("127.0.0.1:0")
2949 .document_root(root.path().to_str().expect("path"))
2950 .build()
2951 .expect("server");
2952
2953 Server::run_basic_accept_loop(
2954 vec![Err(io::Error::other("incoming failed"))],
2955 server.clone(),
2956 );
2957 let pool = ThreadPool::new(1);
2958 Server::run_thread_pool_accept_loop(
2959 vec![Err(io::Error::other("incoming failed"))],
2960 server.clone(),
2961 &pool,
2962 );
2963 Server::run_pooling_accept_loop(
2964 vec![Err(io::Error::other("incoming failed"))],
2965 server.clone(),
2966 &pool,
2967 Arc::new(ConnectionPool::new(1)),
2968 );
2969
2970 let (server_stream, mut client) = connected_pair();
2971 client.write_all(b"BAD\r\n\r\n").expect("write");
2972 Server::run_basic_accept_loop(
2973 vec![Ok(server_stream)],
2974 server.clone(),
2975 );
2976 let mut response = String::new();
2977 let _ = client.read_to_string(&mut response).expect("read");
2978 assert!(response.starts_with("HTTP/1.1 400 BAD REQUEST"));
2979
2980 let (server_stream, mut client) = connected_pair();
2981 client.write_all(b"BAD\r\n\r\n").expect("write");
2982 Server::run_thread_pool_accept_loop(
2983 vec![Ok(server_stream)],
2984 server.clone(),
2985 &pool,
2986 );
2987 let mut response = String::new();
2988 let _ = client.read_to_string(&mut response).expect("read");
2989 assert!(response.starts_with("HTTP/1.1 400 BAD REQUEST"));
2990
2991 let (server_stream, mut client) = connected_pair();
2992 client.write_all(b"BAD\r\n\r\n").expect("write");
2993 Server::run_pooling_accept_loop(
2994 vec![Ok(server_stream)],
2995 server.clone(),
2996 &pool,
2997 Arc::new(ConnectionPool::new(1)),
2998 );
2999 let mut response = String::new();
3000 let _ = client.read_to_string(&mut response).expect("read");
3001 assert!(response.starts_with("HTTP/1.1 400 BAD REQUEST"));
3002
3003 let (server_stream, mut client) = connected_pair();
3004 Server::run_pooling_accept_loop(
3005 vec![Ok(server_stream)],
3006 server,
3007 &pool,
3008 Arc::new(ConnectionPool::new(0)),
3009 );
3010 let mut response = String::new();
3011 let _ = client.read_to_string(&mut response).expect("read");
3012 assert!(
3013 response.starts_with("HTTP/1.1 503 SERVICE UNAVAILABLE")
3014 );
3015 }
3016
3017 #[test]
3018 fn test_immutable_cache_control_policy() {
3019 let root = setup_test_directory();
3020 let server = Server::builder()
3021 .address("127.0.0.1:0")
3022 .document_root(root.path().to_str().expect("path"))
3023 .static_cache_ttl_secs(60)
3024 .build()
3025 .expect("server");
3026
3027 let request = Request {
3028 method: "GET".to_string(),
3029 path: "/assets/app-abcdef12.js".to_string(),
3030 version: "HTTP/1.1".to_string(),
3031 headers: Vec::new(),
3032 };
3033 let response = apply_response_policies(
3034 Response::new(200, "OK", b"ok".to_vec()),
3035 &server,
3036 &request,
3037 );
3038 assert!(response.headers.iter().any(|(name, value)| {
3039 name.eq_ignore_ascii_case("cache-control")
3040 && value.contains("immutable")
3041 }));
3042 }
3043
3044 #[test]
3045 fn test_zstd_precompressed_asset_is_served() {
3046 let root = setup_test_directory();
3047 let file = root.path().join("index.html.zst");
3048 fs::write(&file, b"zstd-data").expect("write");
3049
3050 let headers = vec![(
3051 "accept-encoding".to_string(),
3052 "zstd,gzip".to_string(),
3053 )];
3054 let request = Request {
3055 method: "GET".to_string(),
3056 path: "/index.html".to_string(),
3057 version: "HTTP/1.1".to_string(),
3058 headers,
3059 };
3060
3061 let response = generate_response_with_cache(
3062 &request,
3063 root.path(),
3064 &fs::canonicalize(root.path()).expect("canonicalize"),
3065 None,
3066 DEFAULT_MAX_BUFFERED_BODY_BYTES,
3067 )
3068 .expect("response");
3069 assert!(response.headers.iter().any(|(name, value)| {
3070 name.eq_ignore_ascii_case("content-encoding")
3071 && value.eq_ignore_ascii_case("zstd")
3072 }));
3073 assert_eq!(response.body, b"zstd-data");
3074 }
3075
3076 #[test]
3077 fn test_brotli_precompressed_asset_is_served() {
3078 let root = setup_test_directory();
3079 fs::write(root.path().join("index.html.br"), b"brotli-encoded")
3080 .expect("write br");
3081
3082 let headers = vec![(
3083 "accept-encoding".to_string(),
3084 "br, gzip".to_string(),
3085 )];
3086 let request = Request {
3087 method: "GET".to_string(),
3088 path: "/index.html".to_string(),
3089 version: "HTTP/1.1".to_string(),
3090 headers,
3091 };
3092
3093 let response = generate_response_with_cache(
3094 &request,
3095 root.path(),
3096 &fs::canonicalize(root.path()).expect("canonicalize"),
3097 None,
3098 DEFAULT_MAX_BUFFERED_BODY_BYTES,
3099 )
3100 .expect("response");
3101 assert!(response.headers.iter().any(|(name, value)| {
3102 name.eq_ignore_ascii_case("content-encoding")
3103 && value.eq_ignore_ascii_case("br")
3104 }));
3105 assert_eq!(response.body, b"brotli-encoded");
3106 }
3107
3108 #[test]
3109 fn test_gzip_precompressed_asset_is_served() {
3110 let root = setup_test_directory();
3111 fs::write(root.path().join("index.html.gz"), b"gzdata")
3112 .expect("write gz");
3113
3114 let headers =
3115 vec![("accept-encoding".to_string(), "gzip".to_string())];
3116 let request = Request {
3117 method: "GET".to_string(),
3118 path: "/index.html".to_string(),
3119 version: "HTTP/1.1".to_string(),
3120 headers,
3121 };
3122
3123 let response = generate_response_with_cache(
3124 &request,
3125 root.path(),
3126 &fs::canonicalize(root.path()).expect("canonicalize"),
3127 None,
3128 DEFAULT_MAX_BUFFERED_BODY_BYTES,
3129 )
3130 .expect("response");
3131 assert!(response.headers.iter().any(|(name, value)| {
3132 name.eq_ignore_ascii_case("content-encoding")
3133 && value.eq_ignore_ascii_case("gzip")
3134 }));
3135 assert_eq!(response.body, b"gzdata");
3136 }
3137
3138 #[test]
3139 fn test_serve_file_response_applies_cache_ttl() {
3140 let root = setup_test_directory();
3141 let request = Request {
3142 method: "GET".to_string(),
3143 path: "/index.html".to_string(),
3144 version: "HTTP/1.1".to_string(),
3145 headers: Vec::new(),
3146 };
3147
3148 let response = generate_response_with_cache(
3149 &request,
3150 root.path(),
3151 &fs::canonicalize(root.path()).expect("canonicalize"),
3152 Some(600),
3153 DEFAULT_MAX_BUFFERED_BODY_BYTES,
3154 )
3155 .expect("response");
3156 assert!(response.headers.iter().any(|(name, value)| {
3157 name.eq_ignore_ascii_case("cache-control")
3158 && value.contains("max-age=600")
3159 }));
3160 }
3161
3162 #[test]
3163 fn test_parse_range_header_covers_all_branches() {
3164 assert!(parse_range_header(None, 100).is_none());
3166 assert!(parse_range_header(Some("items=0-1"), 100).is_none());
3167 assert!(
3168 parse_range_header(Some("bytes=no-dash"), 100).is_none()
3169 );
3170 assert!(parse_range_header(Some("bytes=-"), 100).is_none());
3172 assert_eq!(
3174 parse_range_header(Some("bytes=-10"), 100),
3175 Some((90, 99))
3176 );
3177 assert!(parse_range_header(Some("bytes=-0"), 100).is_none());
3179 assert!(parse_range_header(Some("bytes=-500"), 100).is_none());
3180 assert_eq!(
3182 parse_range_header(Some("bytes=10-"), 100),
3183 Some((10, 99))
3184 );
3185 assert!(parse_range_header(Some("bytes=0-"), 0).is_none());
3187 assert!(parse_range_header(Some("bytes=50-10"), 100).is_none());
3189 assert!(
3191 parse_range_header(Some("bytes=0-9999"), 100).is_none()
3192 );
3193 assert_eq!(
3195 parse_range_header(Some("bytes=0-9"), 100),
3196 Some((0, 9))
3197 );
3198 assert!(parse_range_header(Some("bytes=abc-9"), 100).is_none());
3200 assert!(parse_range_header(Some("bytes=0-abc"), 100).is_none());
3201 assert!(parse_range_header(Some("bytes=-abc"), 100).is_none());
3202 }
3203
3204 #[test]
3205 fn test_non_immutable_cache_control_policy_uses_ttl() {
3206 let root = setup_test_directory();
3207 let server = Server::builder()
3208 .address("127.0.0.1:0")
3209 .document_root(root.path().to_str().expect("path"))
3210 .static_cache_ttl_secs(90)
3211 .build()
3212 .expect("server");
3213
3214 let request = Request {
3215 method: "GET".to_string(),
3216 path: "/index.html".to_string(),
3217 version: "HTTP/1.1".to_string(),
3218 headers: Vec::new(),
3219 };
3220 let response = apply_response_policies(
3221 Response::new(200, "OK", b"ok".to_vec()),
3222 &server,
3223 &request,
3224 );
3225 assert!(response.headers.iter().any(|(name, value)| {
3226 name.eq_ignore_ascii_case("cache-control")
3227 && value == "public, max-age=90"
3228 }));
3229 }
3230
3231 #[test]
3232 fn test_cache_control_policy_respects_existing_header() {
3233 let root = setup_test_directory();
3234 let server = Server::builder()
3235 .address("127.0.0.1:0")
3236 .document_root(root.path().to_str().expect("path"))
3237 .static_cache_ttl_secs(90)
3238 .build()
3239 .expect("server");
3240
3241 let mut existing = Response::new(200, "OK", b"ok".to_vec());
3242 existing.add_header("Cache-Control", "no-store");
3243
3244 let request = Request {
3245 method: "GET".to_string(),
3246 path: "/anything.txt".to_string(),
3247 version: "HTTP/1.1".to_string(),
3248 headers: Vec::new(),
3249 };
3250 let response =
3251 apply_response_policies(existing, &server, &request);
3252 let header = response
3253 .headers
3254 .iter()
3255 .find(|(name, _)| {
3256 name.eq_ignore_ascii_case("cache-control")
3257 })
3258 .map(|(_, value)| value.clone())
3259 .expect("cache-control");
3260 assert_eq!(header, "no-store");
3261 }
3262
3263 #[test]
3264 fn test_is_probably_immutable_asset_path_edge_cases() {
3265 assert!(is_probably_immutable_asset_path(
3266 "/assets/app-abcdef12.js"
3267 ));
3268 assert!(!is_probably_immutable_asset_path("/noext"));
3270 assert!(!is_probably_immutable_asset_path(
3272 "/assets/app-zzzzzzzz.js"
3273 ));
3274 assert!(!is_probably_immutable_asset_path("/assets/app-ab.js"));
3276 }
3277
3278 #[test]
3279 fn test_record_metrics_tracks_5xx_responses() {
3280 let before = METRIC_RESPONSES_5XX.load(Ordering::Relaxed);
3281 let response =
3282 Response::new(503, "SERVICE UNAVAILABLE", b"down".to_vec());
3283 record_metrics(&response);
3284 let after = METRIC_RESPONSES_5XX.load(Ordering::Relaxed);
3285 assert!(after > before);
3286 }
3287
3288 #[test]
3289 fn test_rate_limit_recovers_from_poisoned_mutex() {
3290 let ip: IpAddr = "127.0.0.1".parse().expect("ip");
3293 let shard = &rate_limit_table()[rate_limit_shard_index(ip)];
3294 let _ = std::panic::catch_unwind(|| {
3295 let _guard = shard.lock().expect("lock");
3296 panic!("intentional to poison");
3297 });
3298 assert!(shard.is_poisoned());
3299
3300 let root = setup_test_directory();
3301 let server = Server::builder()
3302 .address("127.0.0.1:0")
3303 .document_root(root.path().to_str().expect("path"))
3304 .rate_limit_per_minute(10)
3305 .build()
3306 .expect("server");
3307 let _ = is_rate_limited(&server, ip);
3310
3311 shard.clear_poison();
3313 }
3314
3315 #[test]
3316 fn test_log_connection_result_handles_error() {
3317 Server::log_connection_result(Err(
3320 ServerError::invalid_request("boom"),
3321 ));
3322 }
3323
3324 #[test]
3325 fn test_start_with_shutdown_signal_reports_active_connections_on_timeout()
3326 {
3327 let root = setup_test_directory();
3328 let server = Server::builder()
3329 .address("127.0.0.1:0")
3330 .document_root(root.path().to_str().expect("path"))
3331 .build()
3332 .expect("server");
3333
3334 let shutdown =
3337 Arc::new(ShutdownSignal::new(Duration::from_millis(50)));
3338 let (ready_tx, ready_rx) = mpsc::channel::<String>();
3339 let shutdown_for_server = shutdown.clone();
3340 let server_clone = server.clone();
3341 let handle = thread::spawn(move || {
3342 server_clone
3343 .start_with_shutdown_signal_and_ready(
3344 shutdown_for_server,
3345 move |addr| {
3346 let _ = ready_tx.send(addr);
3347 },
3348 )
3349 .expect("server start");
3350 });
3351
3352 let addr = ready_rx
3353 .recv_timeout(Duration::from_secs(2))
3354 .expect("ready");
3355
3356 let _holder = TcpStream::connect(&addr).expect("connect");
3360 thread::sleep(Duration::from_millis(20));
3361 shutdown.shutdown();
3362
3363 handle.join().expect("join server thread");
3364 }
3365
3366 #[test]
3367 fn test_start_with_thread_pool_serves_one_connection() {
3368 let root = setup_test_directory();
3369 let probe = TcpListener::bind("127.0.0.1:0").expect("probe");
3370 let addr = probe.local_addr().expect("addr");
3371 drop(probe);
3372
3373 let server = Server::builder()
3374 .address(&addr.to_string())
3375 .document_root(root.path().to_str().expect("path"))
3376 .build()
3377 .expect("server");
3378
3379 let _handle = thread::spawn(move || {
3380 let _ = server.start_with_thread_pool(2);
3381 });
3382
3383 let mut stream = None;
3385 for _ in 0..50 {
3386 if let Ok(s) = TcpStream::connect(addr.to_string()) {
3387 stream = Some(s);
3388 break;
3389 }
3390 thread::sleep(Duration::from_millis(20));
3391 }
3392 let mut stream = stream.expect("server did not bind");
3393 stream
3394 .write_all(
3395 b"GET /index.html HTTP/1.1\r\nHost: localhost\r\n\r\n",
3396 )
3397 .expect("write");
3398 let mut response = String::new();
3399 let _ = stream.read_to_string(&mut response).expect("read");
3400 assert!(response.starts_with("HTTP/1.1 200 OK"));
3401 }
3403
3404 #[test]
3405 fn test_start_with_pooling_serves_one_connection() {
3406 let root = setup_test_directory();
3407 let probe = TcpListener::bind("127.0.0.1:0").expect("probe");
3408 let addr = probe.local_addr().expect("addr");
3409 drop(probe);
3410
3411 let server = Server::builder()
3412 .address(&addr.to_string())
3413 .document_root(root.path().to_str().expect("path"))
3414 .build()
3415 .expect("server");
3416
3417 let _handle = thread::spawn(move || {
3418 let _ = server.start_with_pooling(2, 4);
3419 });
3420
3421 let mut stream = None;
3422 for _ in 0..50 {
3423 if let Ok(s) = TcpStream::connect(addr.to_string()) {
3424 stream = Some(s);
3425 break;
3426 }
3427 thread::sleep(Duration::from_millis(20));
3428 }
3429 let mut stream = stream.expect("server did not bind");
3430 stream
3431 .write_all(
3432 b"GET /index.html HTTP/1.1\r\nHost: localhost\r\n\r\n",
3433 )
3434 .expect("write");
3435 let mut response = String::new();
3436 let _ = stream.read_to_string(&mut response).expect("read");
3437 assert!(response.starts_with("HTTP/1.1 200 OK"));
3438 }
3439
3440 #[test]
3449 fn test_server_builder_disable_cors_setter() {
3450 let server = Server::builder()
3451 .address("127.0.0.1:0")
3452 .document_root(".")
3453 .enable_cors()
3454 .disable_cors()
3455 .build()
3456 .expect("server");
3457 assert_eq!(server.cors_enabled(), Some(false));
3458 }
3459
3460 #[test]
3463 fn test_server_builder_max_buffered_body_bytes_override() {
3464 let server = Server::builder()
3465 .address("127.0.0.1:0")
3466 .document_root(".")
3467 .max_buffered_body_bytes(1_000_000)
3468 .build()
3469 .expect("server");
3470 assert_eq!(server.max_buffered_body_bytes(), 1_000_000);
3471 }
3472
3473 #[test]
3475 fn test_server_max_buffered_body_bytes_default_fallback() {
3476 let server = Server::builder()
3477 .address("127.0.0.1:0")
3478 .document_root(".")
3479 .build()
3480 .expect("server");
3481 assert_eq!(
3482 server.max_buffered_body_bytes(),
3483 DEFAULT_MAX_BUFFERED_BODY_BYTES
3484 );
3485 }
3486
3487 #[test]
3489 fn test_shutdown_signal_default_constructor() {
3490 let signal = ShutdownSignal::default();
3491 assert_eq!(signal.shutdown_timeout, Duration::from_secs(30));
3492 assert!(!signal.is_shutdown_requested());
3493 }
3494
3495 #[test]
3499 fn test_serve_file_response_rejects_oversize_body() {
3500 let root = setup_test_directory();
3501 let request = Request {
3502 method: "GET".to_string(),
3503 path: "/index.html".to_string(),
3504 version: "HTTP/1.1".to_string(),
3505 headers: Vec::new(),
3506 };
3507 let canonical =
3508 fs::canonicalize(root.path()).expect("canonicalize");
3509 let err = generate_response_with_cache(
3510 &request,
3511 root.path(),
3512 &canonical,
3513 None,
3514 1, )
3516 .expect_err("oversized file must be rejected");
3517 assert!(
3518 err.to_string().contains("exceeds in-memory serve cap"),
3519 "unexpected error: {err}"
3520 );
3521 }
3522
3523 #[test]
3527 fn test_etag_cache_evicts_when_full() {
3528 let cache = etag_cache();
3529 if let Ok(mut write) = cache.write() {
3531 for i in 0..(ETAG_CACHE_MAX + 1) as u64 {
3532 let _ = write.insert(
3533 (i, i),
3534 Arc::<str>::from(format!("W/\"{i:x}-{i:x}\"")),
3535 );
3536 }
3537 }
3538 let len_before =
3540 cache.read().map(|r| r.len()).unwrap_or_default();
3541 assert!(len_before >= ETAG_CACHE_MAX);
3542
3543 let temp = std::env::temp_dir();
3547 let metadata = fs::metadata(&temp).expect("metadata");
3548 let _ = compute_etag(&metadata);
3549
3550 let len_after =
3551 cache.read().map(|r| r.len()).unwrap_or_default();
3552 assert!(
3555 len_after <= ETAG_CACHE_MAX,
3556 "cache len {len_after} exceeds cap {ETAG_CACHE_MAX}"
3557 );
3558 }
3559
3560 #[test]
3567 fn connection_policy_handles_http_1_0_explicit_keepalive_and_default_close()
3568 {
3569 let keepalive = Request {
3570 method: "GET".into(),
3571 path: "/".into(),
3572 version: "HTTP/1.0".into(),
3573 headers: vec![("connection".into(), "keep-alive".into())],
3574 };
3575 assert_eq!(
3576 ConnectionPolicy::from_request(&keepalive),
3577 ConnectionPolicy::KeepAlive
3578 );
3579
3580 let bare = Request {
3581 method: "GET".into(),
3582 path: "/".into(),
3583 version: "HTTP/1.0".into(),
3584 headers: Vec::new(),
3585 };
3586 assert_eq!(
3587 ConnectionPolicy::from_request(&bare),
3588 ConnectionPolicy::Close
3589 );
3590 }
3591
3592 #[test]
3599 fn canonical_document_root_falls_back_when_cache_is_empty() {
3600 let mut server = Server {
3606 document_root: PathBuf::from("/tmp/some-root"),
3607 canonical_document_root: PathBuf::new(),
3608 ..Server::default()
3609 };
3610 assert_eq!(
3611 server.canonical_document_root(),
3612 Path::new("/tmp/some-root")
3613 );
3614
3615 server.canonical_document_root =
3618 PathBuf::from("/canonical/elsewhere");
3619 assert_eq!(
3620 server.canonical_document_root(),
3621 Path::new("/canonical/elsewhere")
3622 );
3623 }
3624}