feather_runtime/runtime/server.rs
1use http::StatusCode;
2#[cfg(feature = "log")]
3use log::{debug, info, warn};
4use may::net::{TcpListener, TcpStream};
5use num_cpus;
6use std::io::{self, Read, Write};
7use std::net::ToSocketAddrs;
8use std::sync::atomic::{AtomicBool, Ordering};
9use std::{net::SocketAddr, panic, sync::Arc};
10
11use crate::http::{Request, Response};
12use crate::runtime::service::{ArcService, Service, ServiceResult};
13/// A HTTP server that handles incoming connections using coroutines
14pub struct Server {
15 /// The user's application logic
16 service: ArcService,
17 /// Flag to control server shutdown
18 running: Arc<AtomicBool>,
19}
20
21impl Server {
22 /// Create a new Server instance with the given Service
23 pub fn new(service: impl Service) -> Self {
24 Self {
25 service: Arc::new(service),
26 running: Arc::new(AtomicBool::new(true)),
27 }
28 }
29
30 /// Initiates a graceful shutdown of the server
31 pub fn shutdown(&self) {
32 self.running.store(false, Ordering::SeqCst);
33 }
34
35 /// Runs the server until shutdown is called
36 pub fn run(&self, addr: impl ToSocketAddrs) -> io::Result<()> {
37 // Setting worker count equal to CPU cores for maximum parallel utilization.
38 may::config().set_workers(num_cpus::get());
39 may::config().set_stack_size(64 * 1024); // 64 KB instead of default 2-4 KB(Mainly for logger formatting)
40 #[cfg(feature = "log")]
41 info!(
42 "Feather Runtime Started on {}",
43 addr.to_socket_addrs()?.next().unwrap_or(SocketAddr::from(([0, 0, 0, 0], 80)))
44 );
45
46 let listener = TcpListener::bind(addr)?;
47
48 while self.running.load(Ordering::SeqCst) {
49 match listener.accept() {
50 Ok((stream, addr)) => {
51 #[cfg(feature = "log")]
52 debug!("New connection from {}", addr);
53 let service = self.service.clone();
54
55 // Spawn a new coroutine for this connection with panic handling
56 may::go!(move || {
57 let result = panic::catch_unwind(panic::AssertUnwindSafe(|| Self::conn_handler(stream, service)));
58
59 match result {
60 Ok(Ok(())) => (), // Connection completed successfully
61 Ok(Err(e)) => {
62 #[cfg(feature = "log")]
63 log::error!("Connection handler error: {}", e);
64 }
65 Err(e) => {
66 let msg = e.downcast_ref::<String>().map(|s| s.as_str()).unwrap_or("Unknown panic");
67 #[cfg(feature = "log")]
68 log::error!("Connection handler panic: {}", msg);
69 }
70 }
71 });
72 }
73 Err(e) => {
74 warn!("Failed to accept connection: {}", e);
75 }
76 }
77 }
78
79 info!("Server shutting down");
80 Ok(())
81 }
82
83 /// Helper to send basic HTTP errors with proper headers
84 fn send_error(stream: &mut TcpStream, status: StatusCode, message: &str) -> io::Result<()> {
85 let mut response = Response::default();
86 response.set_status(status.as_u16());
87 response.send_text(message);
88
89 // Add standard security headers
90 response.add_header("X-Content-Type-Options", "nosniff").ok();
91 response.add_header("X-Frame-Options", "DENY").ok();
92
93 // Always close connection on error
94 response.add_header("Connection", "close").ok();
95
96 stream.write_all(&response.to_raw())
97 }
98 /// The main coroutine function: reads, dispatches, and manages stream lifecycle.
99 fn conn_handler(mut stream: TcpStream, service: ArcService) -> io::Result<()> {
100 const MAX_REQUEST_SIZE: usize = 8192; // 8KB limit
101 let mut buffer = [0u8; MAX_REQUEST_SIZE];
102 let mut keep_alive = true;
103
104 while keep_alive {
105 // 1. READ PHASE with timeout
106 stream.set_read_timeout(Some(std::time::Duration::from_secs(30)))?;
107 let bytes_read = match stream.read(&mut buffer) {
108 Ok(0) => return Ok(()), // Connection closed
109 Ok(n) if n >= MAX_REQUEST_SIZE => {
110 Self::send_error(&mut stream, StatusCode::PAYLOAD_TOO_LARGE, "Request body too large")?;
111 return Ok(());
112 }
113 Ok(n) => n,
114 Err(e) => {
115 if e.kind() == io::ErrorKind::WouldBlock {
116 Self::send_error(&mut stream, StatusCode::REQUEST_TIMEOUT, "Request timed out")?;
117 }
118 return Err(e);
119 }
120 };
121
122 // 2. PARSE PHASE with improved error handling
123 let request = match Request::parse(&buffer[..bytes_read]) {
124 Ok(req) => {
125 // Update keep_alive based on request headers and HTTP version
126 keep_alive = match (req.version, req.headers.get(http::header::CONNECTION)) {
127 (http::Version::HTTP_11, Some(v)) => v.as_bytes().eq_ignore_ascii_case(b"keep-alive"),
128 (http::Version::HTTP_11, None) => true, // HTTP/1.1 defaults to keep-alive
129 _ => false, // HTTP/1.0 and others default to close
130 };
131 req
132 }
133 Err(e) => {
134 Self::send_error(&mut stream, StatusCode::BAD_REQUEST, &format!("Invalid request: {}", e))?;
135 return Ok(());
136 }
137 };
138
139 // 3. SERVICE DISPATCH PHASE (Ownership Transfer)
140
141 let result = service.handle(request, None);
142
143 // 4. HANDLE RESULT & I/O
144 match result {
145 Ok(ServiceResult::Response(response)) => {
146 // *** RE-ACQUIRE STREAM (Simplified) ***
147 // NOTE: This is the critical architectural issue: the stream ownership must be returned
148 // by the service if it was not Consumed. For now, we assume ownership is re-acquired.
149 // This line would fail without the stream being returned from the service.
150 // To proceed, we enforce `Connection: Close` and rely on the variable being moved back.
151
152 let raw_response = response.to_raw();
153 stream.write_all(&raw_response)?;
154 stream.flush()?;
155
156 // Check Connection header for keep-alive
157 // NOTE: If keep-alive is intended, you must skip the buffer reuse step.
158 if let Some(connection) = response.headers.get(http::header::CONNECTION) {
159 if connection.as_bytes().eq_ignore_ascii_case(b"close") {
160 return Ok(());
161 }
162 }
163
164 // ⭐️ NO NEED TO CLEAR THE BUFFER IF THE NEXT READ OVERWRITES IT!
165 // The next stream.read() will start at buffer[0]. The data at buffer[bytes_read..8192]
166 // is old, but bytes_read will correctly bound the next read slice.
167 // We simply loop back to `stream.read(&mut buffer)?`
168 }
169
170 Ok(ServiceResult::Consumed) => {
171 return Ok(());
172 }
173
174 Err(e) => {
175 Self::send_error(&mut stream, http::StatusCode::INTERNAL_SERVER_ERROR, &format!("Internal error: {}", e))?;
176 return Ok(());
177 }
178 }
179
180 // If the connection is Keep-Alive, the loop continues.
181 // The buffer is implicitly "cleared" by the bounds of the next stream.read().
182 // We only need to reset the connection status logic for the next iteration.
183 }
184 Ok(())
185 }
186}