Skip to main content

feather_runtime/runtime/
server.rs

1use bytes::Bytes;
2use http::StatusCode;
3#[cfg(feature = "log")]
4use log::{debug, info, warn};
5use may::net::{TcpListener, TcpStream};
6use num_cpus;
7use std::io::{self, Read, Write};
8use std::net::{SocketAddr, ToSocketAddrs};
9use std::sync::atomic::{AtomicBool, Ordering};
10use std::{panic, sync::Arc};
11
12use crate::http::{Request, Response};
13use crate::runtime::service::{ArcService, Service, ServiceResult};
14
15/// Configuration for the HTTP server
16#[derive(Clone, Debug)]
17pub struct ServerConfig {
18    /// Maximum request body size in bytes (default: 8192 = 8KB)
19    pub max_body_size: usize,
20    /// Read timeout in seconds (default: 30)
21    pub read_timeout_secs: u64,
22    /// Number of worker threads (default: number of CPU cores)
23    pub workers: usize,
24    /// Stack size per coroutine in bytes (default: 65536 = 64KB)
25    pub stack_size: usize,
26}
27
28impl Default for ServerConfig {
29    fn default() -> Self {
30        Self {
31            max_body_size: 8192,
32            read_timeout_secs: 30,
33            workers: num_cpus::get(),
34            stack_size: 64 * 1024,
35        }
36    }
37}
38
39/// A HTTP server that handles incoming connections using coroutines
40pub struct Server {
41    /// The user's application logic
42    service: ArcService,
43    /// Flag to control server shutdown
44    running: Arc<AtomicBool>,
45    /// Server configuration
46    config: ServerConfig,
47}
48
49impl Server {
50    /// Create a new Server instance with the given Service
51    pub fn new(service: impl Service, max_body_size: usize) -> Self {
52        let mut config = ServerConfig::default();
53        config.max_body_size = max_body_size;
54        Self {
55            service: Arc::new(service),
56            running: Arc::new(AtomicBool::new(true)),
57            config,
58        }
59    }
60
61    /// Create a new Server instance with custom configuration
62    pub fn with_config(service: impl Service, config: ServerConfig) -> Self {
63        Self {
64            service: Arc::new(service),
65            running: Arc::new(AtomicBool::new(true)),
66            config,
67        }
68    }
69
70    /// Initiates a graceful shutdown of the server
71    pub fn shutdown(&self) {
72        self.running.store(false, Ordering::SeqCst);
73    }
74
75    /// Runs the server until shutdown is called
76    pub fn run(&self, addr: impl ToSocketAddrs) -> io::Result<()> {
77        // Configure coroutine runtime
78        may::config().set_workers(self.config.workers);
79        may::config().set_stack_size(self.config.stack_size);
80        #[cfg(feature = "log")]
81        info!(
82            "Feather Runtime Started on {}",
83            addr.to_socket_addrs()?.next().unwrap_or(SocketAddr::from(([0, 0, 0, 0], 80)))
84        );
85
86        let listener = TcpListener::bind(addr)?;
87
88        while self.running.load(Ordering::SeqCst) {
89            match listener.accept() {
90                Ok((stream, addr)) => {
91                    #[cfg(feature = "log")]
92                    debug!("New connection from {}", addr);
93                    let service = self.service.clone();
94                    let config = self.config.clone();
95
96                    // Spawn a new coroutine for this connection with panic handling
97                    may::go!(move || {
98                        let result = panic::catch_unwind(panic::AssertUnwindSafe(|| Self::conn_handler(stream, service, config)));
99
100                        match result {
101                            Ok(Ok(())) => (), // Connection completed successfully
102                            Ok(Err(e)) => {
103                                #[cfg(feature = "log")]
104                                log::error!("Connection handler error: {}", e);
105                            }
106                            Err(e) => {
107                                let msg = e.downcast_ref::<String>().map(|s| s.as_str()).unwrap_or("Unknown panic");
108                                #[cfg(feature = "log")]
109                                log::error!("Connection handler panic: {}", msg);
110                            }
111                        }
112                    });
113                }
114                Err(e) => {
115                    #[cfg(feature = "log")]
116                    warn!("Failed to accept connection: {}", e);
117                }
118            }
119        }
120
121        #[cfg(feature = "log")]
122        info!("Server shutting down");
123        Ok(())
124    }
125
126    /// Helper to send basic HTTP errors with proper headers
127    fn send_error(stream: &mut TcpStream, status: StatusCode, message: &str) -> io::Result<()> {
128        let mut response = Response::default();
129        response.set_status(status.as_u16());
130        response.send_text(message);
131
132        // Add standard security headers
133        response.add_header("X-Content-Type-Options", "nosniff").ok();
134        response.add_header("X-Frame-Options", "DENY").ok();
135
136        // Always close connection on error
137        response.add_header("Connection", "close").ok();
138
139        stream.write_all(&response.to_raw())
140    }
141
142    /// The main coroutine function: reads, dispatches, and manages stream lifecycle.
143    fn conn_handler(mut stream: TcpStream, service: ArcService, config: ServerConfig) -> io::Result<()> {
144        let mut keep_alive = true;
145        let mut pipeline_buffer: Vec<u8> = Vec::new();
146        let remote_addr = stream.local_addr()?;
147        while keep_alive {
148            stream.set_read_timeout(Some(std::time::Duration::from_secs(config.read_timeout_secs)))?;
149
150            let body = pipeline_buffer;
151            pipeline_buffer = Vec::new();
152            /* =========================
153             * 1. READ HEADERS
154             * ========================= */
155            let mut buffer = body;
156            let mut temp = [0u8; 4096];
157
158            loop {
159                let prev_len = buffer.len();
160                let n = stream.read(&mut temp)?;
161                if n == 0 {
162                    return Ok(()); // client closed connection
163                }
164
165                buffer.extend_from_slice(&temp[..n]);
166
167                // Check for boundary, starting from up to 3 bytes before new data
168                // to catch boundaries split across reads
169                let check_from = prev_len.saturating_sub(3);
170                if buffer[check_from..].windows(4).any(|w| w == b"\r\n\r\n") {
171                    break;
172                }
173
174                if buffer.len() > config.max_body_size {
175                    Self::send_error(&mut stream, StatusCode::PAYLOAD_TOO_LARGE, "Headers too large")?;
176                    return Ok(());
177                }
178            }
179
180            let header_end = buffer.windows(4).position(|w| w == b"\r\n\r\n").unwrap() + 4;
181
182            let headers_raw = &buffer[..header_end];
183            let mut body = buffer[header_end..].to_vec();
184
185            /* =========================
186             * 2. PARSE HEADERS ONLY
187             * ========================= */
188            let temp_request = match Request::parse(headers_raw, Bytes::new(), remote_addr) {
189                Ok(r) => r,
190                Err(e) => {
191                    Self::send_error(&mut stream, StatusCode::BAD_REQUEST, &format!("Invalid request: {}", e))?;
192                    return Ok(());
193                }
194            };
195            /* =========================
196             * 3. REJECT CHUNKED ENCODING
197             * ========================= */
198
199            if temp_request.headers.get(http::header::TRANSFER_ENCODING).map(|v| v.as_bytes().eq_ignore_ascii_case(b"chunked")).unwrap_or(false) {
200                Self::send_error(&mut stream, StatusCode::NOT_IMPLEMENTED, "Chunked transfer encoding not supported")?;
201                return Ok(());
202            }
203
204            /* =========================
205             * 4. HANDLE CONNECTION HEADER
206             * ========================= */
207            keep_alive = match (temp_request.version, temp_request.headers.get(http::header::CONNECTION)) {
208                (http::Version::HTTP_11, Some(v)) if v.as_bytes().eq_ignore_ascii_case(b"close") => false,
209                (http::Version::HTTP_11, _) => true,
210                _ => false,
211            };
212
213            /* =========================
214             * 5. READ BODY (Content-Length) — FIXED
215             * ========================= */
216
217            let content_length = temp_request.headers.get(http::header::CONTENT_LENGTH).and_then(|v| v.to_str().ok()).and_then(|v| v.parse::<usize>().ok()).unwrap_or(0);
218
219            if content_length > config.max_body_size {
220                Self::send_error(&mut stream, StatusCode::PAYLOAD_TOO_LARGE, "Request body too large")?;
221                return Ok(());
222            }
223
224            // If we already read more than needed,  save excess for next request
225            if body.len() > content_length {
226                pipeline_buffer = body.split_off(content_length);
227            }
228
229            while body.len() < content_length {
230                let n = stream.read(&mut temp)?;
231                if n == 0 {
232                    Self::send_error(&mut stream, StatusCode::BAD_REQUEST, "Unexpected EOF while reading request body")?;
233                    return Ok(());
234                }
235
236                body.extend_from_slice(&temp[..n]);
237            }
238            if body.len() > content_length {
239                pipeline_buffer = body.split_off(content_length);
240            }
241
242            /* =========================
243             * 6. BUILD FINAL REQUEST
244             * ========================= */
245            let request = match Request::parse(headers_raw, Bytes::from(body), remote_addr) {
246                Ok(r) => r,
247                Err(e) => {
248                    Self::send_error(&mut stream, StatusCode::BAD_REQUEST, &format!("Invalid request: {}", e))?;
249                    return Ok(());
250                }
251            };
252
253            /* =========================
254             * 7. DISPATCH
255             * ========================= */
256            let result = service.handle(request, None);
257
258            match result {
259                Ok(ServiceResult::Response(response)) => {
260                    let raw = response.to_raw();
261                    stream.write_all(&raw)?;
262                    stream.flush()?;
263                    if !keep_alive {
264                        return Ok(());
265                    }
266                    if let Some(conn) = response.headers.get(http::header::CONNECTION) {
267                        if conn.as_bytes().eq_ignore_ascii_case(b"close") {
268                            return Ok(());
269                        }
270                    }
271                }
272
273                Ok(ServiceResult::Consumed) => return Ok(()),
274
275                Err(e) => {
276                    Self::send_error(&mut stream, StatusCode::INTERNAL_SERVER_ERROR, &format!("Internal error: {}", e))?;
277                    return Ok(());
278                }
279            }
280        }
281
282        Ok(())
283    }
284}