feather_runtime/runtime/
server.rs

1use http::StatusCode;
2#[cfg(feature = "log")]
3use log::{debug, info, warn};
4use may::net::{TcpListener, TcpStream};
5use num_cpus;
6use std::io::{self, Read, Write};
7use std::net::ToSocketAddrs;
8use std::sync::atomic::{AtomicBool, Ordering};
9use std::{net::SocketAddr, panic, sync::Arc};
10
11use crate::http::{Request, Response};
12use crate::runtime::service::{ArcService, Service, ServiceResult};
13/// A HTTP server that handles incoming connections using coroutines
14pub struct Server {
15    /// The user's application logic
16    service: ArcService,
17    /// Flag to control server shutdown
18    running: Arc<AtomicBool>,
19}
20
21impl Server {
22    /// Create a new Server instance with the given Service
23    pub fn new(service: impl Service) -> Self {
24        Self {
25            service: Arc::new(service),
26            running: Arc::new(AtomicBool::new(true)),
27        }
28    }
29
30    /// Initiates a graceful shutdown of the server
31    pub fn shutdown(&self) {
32        self.running.store(false, Ordering::SeqCst);
33    }
34
35    /// Runs the server until shutdown is called
36    pub fn run(&self, addr: impl ToSocketAddrs) -> io::Result<()> {
37        // Setting worker count equal to CPU cores for maximum parallel utilization.
38        may::config().set_workers(num_cpus::get());
39        may::config().set_stack_size(64 * 1024); // 64 KB instead of default 2-4 KB(Mainly for logger formatting)
40        #[cfg(feature = "log")]
41        info!(
42            "Feather Runtime Started on {}",
43            addr.to_socket_addrs()?.next().unwrap_or(SocketAddr::from(([0, 0, 0, 0], 80)))
44        );
45
46        let listener = TcpListener::bind(addr)?;
47
48        while self.running.load(Ordering::SeqCst) {
49            match listener.accept() {
50                Ok((stream, addr)) => {
51                    #[cfg(feature = "log")]
52                    debug!("New connection from {}", addr);
53                    let service = self.service.clone();
54
55                    // Spawn a new coroutine for this connection with panic handling
56                    may::go!(move || {
57                        let result = panic::catch_unwind(panic::AssertUnwindSafe(|| Self::conn_handler(stream, service)));
58
59                        match result {
60                            Ok(Ok(())) => (), // Connection completed successfully
61                            Ok(Err(e)) => {
62                                #[cfg(feature = "log")]
63                                log::error!("Connection handler error: {}", e);
64                            }
65                            Err(e) => {
66                                let msg = e.downcast_ref::<String>().map(|s| s.as_str()).unwrap_or("Unknown panic");
67                                #[cfg(feature = "log")]
68                                log::error!("Connection handler panic: {}", msg);
69                            }
70                        }
71                    });
72                }
73                Err(e) => {
74                    warn!("Failed to accept connection: {}", e);
75                }
76            }
77        }
78
79        info!("Server shutting down");
80        Ok(())
81    }
82
83    /// Helper to send basic HTTP errors with proper headers
84    fn send_error(stream: &mut TcpStream, status: StatusCode, message: &str) -> io::Result<()> {
85        let mut response = Response::default();
86        response.set_status(status.as_u16());
87        response.send_text(message);
88
89        // Add standard security headers
90        response.add_header("X-Content-Type-Options", "nosniff").ok();
91        response.add_header("X-Frame-Options", "DENY").ok();
92
93        // Always close connection on error
94        response.add_header("Connection", "close").ok();
95
96        stream.write_all(&response.to_raw())
97    }
98    /// The main coroutine function: reads, dispatches, and manages stream lifecycle.
99    fn conn_handler(mut stream: TcpStream, service: ArcService) -> io::Result<()> {
100        const MAX_REQUEST_SIZE: usize = 8192; // 8KB limit
101        let mut buffer = [0u8; MAX_REQUEST_SIZE];
102        let mut keep_alive = true;
103
104        while keep_alive {
105            // 1. READ PHASE with timeout
106            stream.set_read_timeout(Some(std::time::Duration::from_secs(30)))?;
107            let bytes_read = match stream.read(&mut buffer) {
108                Ok(0) => return Ok(()), // Connection closed
109                Ok(n) if n >= MAX_REQUEST_SIZE => {
110                    Self::send_error(&mut stream, StatusCode::PAYLOAD_TOO_LARGE, "Request body too large")?;
111                    return Ok(());
112                }
113                Ok(n) => n,
114                Err(e) => {
115                    if e.kind() == io::ErrorKind::WouldBlock {
116                        Self::send_error(&mut stream, StatusCode::REQUEST_TIMEOUT, "Request timed out")?;
117                    }
118                    return Err(e);
119                }
120            };
121
122            // 2. PARSE PHASE with improved error handling
123            let request = match Request::parse(&buffer[..bytes_read]) {
124                Ok(req) => {
125                    // Update keep_alive based on request headers and HTTP version
126                    keep_alive = match (req.version, req.headers.get(http::header::CONNECTION)) {
127                        (http::Version::HTTP_11, Some(v)) => v.as_bytes().eq_ignore_ascii_case(b"keep-alive"),
128                        (http::Version::HTTP_11, None) => true, // HTTP/1.1 defaults to keep-alive
129                        _ => false,                             // HTTP/1.0 and others default to close
130                    };
131                    req
132                }
133                Err(e) => {
134                    Self::send_error(&mut stream, StatusCode::BAD_REQUEST, &format!("Invalid request: {}", e))?;
135                    return Ok(());
136                }
137            };
138
139            // 3. SERVICE DISPATCH PHASE (Ownership Transfer)
140
141            let result = service.handle(request, None);
142
143            // 4. HANDLE RESULT & I/O
144            match result {
145                Ok(ServiceResult::Response(response)) => {
146                    // *** RE-ACQUIRE STREAM (Simplified) ***
147                    // NOTE: This is the critical architectural issue: the stream ownership must be returned
148                    // by the service if it was not Consumed. For now, we assume ownership is re-acquired.
149                    // This line would fail without the stream being returned from the service.
150                    // To proceed, we enforce `Connection: Close` and rely on the variable being moved back.
151
152                    let raw_response = response.to_raw();
153                    stream.write_all(&raw_response)?;
154                    stream.flush()?;
155
156                    // Check Connection header for keep-alive
157                    // NOTE: If keep-alive is intended, you must skip the buffer reuse step.
158                    if let Some(connection) = response.headers.get(http::header::CONNECTION) {
159                        if connection.as_bytes().eq_ignore_ascii_case(b"close") {
160                            return Ok(());
161                        }
162                    }
163
164                    // ⭐️ NO NEED TO CLEAR THE BUFFER IF THE NEXT READ OVERWRITES IT!
165                    // The next stream.read() will start at buffer[0]. The data at buffer[bytes_read..8192]
166                    // is old, but bytes_read will correctly bound the next read slice.
167                    // We simply loop back to `stream.read(&mut buffer)?`
168                }
169
170                Ok(ServiceResult::Consumed) => {
171                    return Ok(());
172                }
173
174                Err(e) => {
175                    Self::send_error(&mut stream, http::StatusCode::INTERNAL_SERVER_ERROR, &format!("Internal error: {}", e))?;
176                    return Ok(());
177                }
178            }
179
180            // If the connection is Keep-Alive, the loop continues.
181            // The buffer is implicitly "cleared" by the bounds of the next stream.read().
182            // We only need to reset the connection status logic for the next iteration.
183        }
184        Ok(())
185    }
186}