feather_runtime/runtime/
server.rs

1use http::StatusCode;
2#[cfg(feature = "log")]
3use log::{debug, info, warn};
4use may::net::{TcpListener, TcpStream};
5use num_cpus;
6use std::io::{self, Read, Write};
7use std::net::{SocketAddr, ToSocketAddrs};
8use std::sync::atomic::{AtomicBool, Ordering};
9use std::{panic, sync::Arc};
10
11use crate::http::{Request, Response};
12use crate::runtime::service::{ArcService, Service, ServiceResult};
13
14/// Configuration for the HTTP server
15#[derive(Clone, Debug)]
16pub struct ServerConfig {
17    /// Maximum request body size in bytes (default: 8192 = 8KB)
18    pub max_body_size: usize,
19    /// Read timeout in seconds (default: 30)
20    pub read_timeout_secs: u64,
21    /// Number of worker threads (default: number of CPU cores)
22    pub workers: usize,
23    /// Stack size per coroutine in bytes (default: 65536 = 64KB)
24    pub stack_size: usize,
25}
26
27impl Default for ServerConfig {
28    fn default() -> Self {
29        Self {
30            max_body_size: 8192,
31            read_timeout_secs: 30,
32            workers: num_cpus::get(),
33            stack_size: 64 * 1024,
34        }
35    }
36}
37
38/// A HTTP server that handles incoming connections using coroutines
39pub struct Server {
40    /// The user's application logic
41    service: ArcService,
42    /// Flag to control server shutdown
43    running: Arc<AtomicBool>,
44    /// Server configuration
45    config: ServerConfig,
46}
47
48impl Server {
49    /// Create a new Server instance with the given Service
50    pub fn new(service: impl Service, max_body_size: usize) -> Self {
51        let mut config = ServerConfig::default();
52        config.max_body_size = max_body_size;
53        Self {
54            service: Arc::new(service),
55            running: Arc::new(AtomicBool::new(true)),
56            config,
57        }
58    }
59
60    /// Create a new Server instance with custom configuration
61    pub fn with_config(service: impl Service, config: ServerConfig) -> Self {
62        Self {
63            service: Arc::new(service),
64            running: Arc::new(AtomicBool::new(true)),
65            config,
66        }
67    }
68
69    /// Initiates a graceful shutdown of the server
70    pub fn shutdown(&self) {
71        self.running.store(false, Ordering::SeqCst);
72    }
73
74    /// Runs the server until shutdown is called
75    pub fn run(&self, addr: impl ToSocketAddrs) -> io::Result<()> {
76        // Configure coroutine runtime
77        may::config().set_workers(self.config.workers);
78        may::config().set_stack_size(self.config.stack_size);
79        #[cfg(feature = "log")]
80        info!(
81            "Feather Runtime Started on {}",
82            addr.to_socket_addrs()?.next().unwrap_or(SocketAddr::from(([0, 0, 0, 0], 80)))
83        );
84
85        let listener = TcpListener::bind(addr)?;
86
87        while self.running.load(Ordering::SeqCst) {
88            match listener.accept() {
89                Ok((stream, addr)) => {
90                    #[cfg(feature = "log")]
91                    debug!("New connection from {}", addr);
92                    let service = self.service.clone();
93                    let config = self.config.clone();
94
95                    // Spawn a new coroutine for this connection with panic handling
96                    may::go!(move || {
97                        let result = panic::catch_unwind(panic::AssertUnwindSafe(|| Self::conn_handler(stream, service, config)));
98
99                        match result {
100                            Ok(Ok(())) => (), // Connection completed successfully
101                            Ok(Err(e)) => {
102                                #[cfg(feature = "log")]
103                                log::error!("Connection handler error: {}", e);
104                            }
105                            Err(e) => {
106                                let msg = e.downcast_ref::<String>().map(|s| s.as_str()).unwrap_or("Unknown panic");
107                                #[cfg(feature = "log")]
108                                log::error!("Connection handler panic: {}", msg);
109                            }
110                        }
111                    });
112                }
113                Err(e) => {
114                    #[cfg(feature = "log")]
115                    warn!("Failed to accept connection: {}", e);
116                }
117            }
118        }
119
120        #[cfg(feature = "log")]
121        info!("Server shutting down");
122        Ok(())
123    }
124
125    /// Helper to send basic HTTP errors with proper headers
126    fn send_error(stream: &mut TcpStream, status: StatusCode, message: &str) -> io::Result<()> {
127        let mut response = Response::default();
128        response.set_status(status.as_u16());
129        response.send_text(message);
130
131        // Add standard security headers
132        response.add_header("X-Content-Type-Options", "nosniff").ok();
133        response.add_header("X-Frame-Options", "DENY").ok();
134
135        // Always close connection on error
136        response.add_header("Connection", "close").ok();
137
138        stream.write_all(&response.to_raw())
139    }
140    /// The main coroutine function: reads, dispatches, and manages stream lifecycle.
141    fn conn_handler(mut stream: TcpStream, service: ArcService, config: ServerConfig) -> io::Result<()> {
142        // Use a reasonable buffer size, capped at max_body_size
143        let mut buffer = vec![0u8; config.max_body_size];
144        let mut keep_alive = true;
145
146        while keep_alive {
147            // 1. READ PHASE with timeout
148            stream.set_read_timeout(Some(std::time::Duration::from_secs(config.read_timeout_secs)))?;
149            let bytes_read = match stream.read(&mut buffer) {
150                Ok(0) => return Ok(()), // Connection closed
151                Ok(n) if n >= config.max_body_size => {
152                    Self::send_error(&mut stream, StatusCode::PAYLOAD_TOO_LARGE, "Request body too large")?;
153                    return Ok(());
154                }
155                Ok(n) => n,
156                Err(e) => {
157                    if e.kind() == io::ErrorKind::WouldBlock {
158                        Self::send_error(&mut stream, StatusCode::REQUEST_TIMEOUT, "Request timed out")?;
159                    }
160                    return Err(e);
161                }
162            };
163
164            // 2. PARSE PHASE with improved error handling
165            let request = match Request::parse(&buffer[..bytes_read]) {
166                Ok(req) => {
167                    // Update keep_alive based on request headers and HTTP version
168                    keep_alive = match (req.version, req.headers.get(http::header::CONNECTION)) {
169                        (http::Version::HTTP_11, Some(v)) => v.as_bytes().eq_ignore_ascii_case(b"keep-alive"),
170                        (http::Version::HTTP_11, None) => true, // HTTP/1.1 defaults to keep-alive
171                        _ => false,                             // HTTP/1.0 and others default to close
172                    };
173                    req
174                }
175                Err(e) => {
176                    Self::send_error(&mut stream, StatusCode::BAD_REQUEST, &format!("Invalid request: {}", e))?;
177                    return Ok(());
178                }
179            };
180
181            // 3. SERVICE DISPATCH PHASE (Ownership Transfer)
182
183            let result = service.handle(request, None);
184
185            // 4. HANDLE RESULT & I/O
186            match result {
187                Ok(ServiceResult::Response(response)) => {
188                    // *** RE-ACQUIRE STREAM (Simplified) ***
189                    // NOTE: This is the critical architectural issue: the stream ownership must be returned
190                    // by the service if it was not Consumed. For now, we assume ownership is re-acquired.
191                    // This line would fail without the stream being returned from the service.
192                    // To proceed, we enforce `Connection: Close` and rely on the variable being moved back.
193
194                    let raw_response = response.to_raw();
195                    stream.write_all(&raw_response)?;
196                    stream.flush()?;
197
198                    // Check Connection header for keep-alive
199                    // NOTE: If keep-alive is intended, you must skip the buffer reuse step.
200                    if let Some(connection) = response.headers.get(http::header::CONNECTION) {
201                        if connection.as_bytes().eq_ignore_ascii_case(b"close") {
202                            return Ok(());
203                        }
204                    }
205
206                    // ⭐️ NO NEED TO CLEAR THE BUFFER IF THE NEXT READ OVERWRITES IT!
207                    // The next stream.read() will start at buffer[0]. The data at buffer[bytes_read..8192]
208                    // is old, but bytes_read will correctly bound the next read slice.
209                    // We simply loop back to `stream.read(&mut buffer)?`
210                }
211
212                Ok(ServiceResult::Consumed) => {
213                    return Ok(());
214                }
215
216                Err(e) => {
217                    Self::send_error(&mut stream, http::StatusCode::INTERNAL_SERVER_ERROR, &format!("Internal error: {}", e))?;
218                    return Ok(());
219                }
220            }
221
222            // If the connection is Keep-Alive, the loop continues.
223            // The buffer is implicitly "cleared" by the bounds of the next stream.read().
224            // We only need to reset the connection status logic for the next iteration.
225        }
226        Ok(())
227    }
228}