feather_runtime/runtime/
server.rs1use bytes::Bytes;
2use http::StatusCode;
3#[cfg(feature = "log")]
4use log::{debug, info, warn};
5use may::net::{TcpListener, TcpStream};
6use num_cpus;
7use std::io::{self, Read, Write};
8use std::net::{SocketAddr, ToSocketAddrs};
9use std::sync::atomic::{AtomicBool, Ordering};
10use std::{panic, sync::Arc};
11
12use crate::http::{Request, Response};
13use crate::runtime::service::{ArcService, Service, ServiceResult};
14
15#[derive(Clone, Debug)]
17pub struct ServerConfig {
18 pub max_body_size: usize,
20 pub read_timeout_secs: u64,
22 pub workers: usize,
24 pub stack_size: usize,
26}
27
28impl Default for ServerConfig {
29 fn default() -> Self {
30 Self {
31 max_body_size: 8192,
32 read_timeout_secs: 30,
33 workers: num_cpus::get(),
34 stack_size: 64 * 1024,
35 }
36 }
37}
38
39pub struct Server {
41 service: ArcService,
43 running: Arc<AtomicBool>,
45 config: ServerConfig,
47}
48
49impl Server {
50 pub fn new(service: impl Service, max_body_size: usize) -> Self {
52 let mut config = ServerConfig::default();
53 config.max_body_size = max_body_size;
54 Self {
55 service: Arc::new(service),
56 running: Arc::new(AtomicBool::new(true)),
57 config,
58 }
59 }
60
61 pub fn with_config(service: impl Service, config: ServerConfig) -> Self {
63 Self {
64 service: Arc::new(service),
65 running: Arc::new(AtomicBool::new(true)),
66 config,
67 }
68 }
69
70 pub fn shutdown(&self) {
72 self.running.store(false, Ordering::SeqCst);
73 }
74
75 pub fn run(&self, addr: impl ToSocketAddrs) -> io::Result<()> {
77 may::config().set_workers(self.config.workers);
79 may::config().set_stack_size(self.config.stack_size);
80 #[cfg(feature = "log")]
81 info!(
82 "Feather Runtime Started on {}",
83 addr.to_socket_addrs()?.next().unwrap_or(SocketAddr::from(([0, 0, 0, 0], 80)))
84 );
85
86 let listener = TcpListener::bind(addr)?;
87
88 while self.running.load(Ordering::SeqCst) {
89 match listener.accept() {
90 Ok((stream, addr)) => {
91 #[cfg(feature = "log")]
92 debug!("New connection from {}", addr);
93 let service = self.service.clone();
94 let config = self.config.clone();
95
96 may::go!(move || {
98 let result = panic::catch_unwind(panic::AssertUnwindSafe(|| Self::conn_handler(stream, service, config)));
99
100 match result {
101 Ok(Ok(())) => (), Ok(Err(e)) => {
103 #[cfg(feature = "log")]
104 log::error!("Connection handler error: {}", e);
105 }
106 Err(e) => {
107 let msg = e.downcast_ref::<String>().map(|s| s.as_str()).unwrap_or("Unknown panic");
108 #[cfg(feature = "log")]
109 log::error!("Connection handler panic: {}", msg);
110 }
111 }
112 });
113 }
114 Err(e) => {
115 #[cfg(feature = "log")]
116 warn!("Failed to accept connection: {}", e);
117 }
118 }
119 }
120
121 #[cfg(feature = "log")]
122 info!("Server shutting down");
123 Ok(())
124 }
125
126 fn send_error(stream: &mut TcpStream, status: StatusCode, message: &str) -> io::Result<()> {
128 let mut response = Response::default();
129 response.set_status(status.as_u16());
130 response.send_text(message);
131
132 response.add_header("X-Content-Type-Options", "nosniff").ok();
134 response.add_header("X-Frame-Options", "DENY").ok();
135
136 response.add_header("Connection", "close").ok();
138
139 stream.write_all(&response.to_raw())
140 }
141
142 fn conn_handler(mut stream: TcpStream, service: ArcService, config: ServerConfig) -> io::Result<()> {
144 let mut keep_alive = true;
145 let mut pipeline_buffer: Vec<u8> = Vec::new();
146 let remote_addr = stream.local_addr()?;
147 while keep_alive {
148 stream.set_read_timeout(Some(std::time::Duration::from_secs(config.read_timeout_secs)))?;
149
150 let body = pipeline_buffer;
151 pipeline_buffer = Vec::new();
152 let mut buffer = body;
156 let mut temp = [0u8; 4096];
157
158 loop {
159 let prev_len = buffer.len();
160 let n = stream.read(&mut temp)?;
161 if n == 0 {
162 return Ok(()); }
164
165 buffer.extend_from_slice(&temp[..n]);
166
167 let check_from = prev_len.saturating_sub(3);
170 if buffer[check_from..].windows(4).any(|w| w == b"\r\n\r\n") {
171 break;
172 }
173
174 if buffer.len() > config.max_body_size {
175 Self::send_error(&mut stream, StatusCode::PAYLOAD_TOO_LARGE, "Headers too large")?;
176 return Ok(());
177 }
178 }
179
180 let header_end = buffer.windows(4).position(|w| w == b"\r\n\r\n").unwrap() + 4;
181
182 let headers_raw = &buffer[..header_end];
183 let mut body = buffer[header_end..].to_vec();
184
185 let temp_request = match Request::parse(headers_raw, Bytes::new(), remote_addr) {
189 Ok(r) => r,
190 Err(e) => {
191 Self::send_error(&mut stream, StatusCode::BAD_REQUEST, &format!("Invalid request: {}", e))?;
192 return Ok(());
193 }
194 };
195 if temp_request.headers.get(http::header::TRANSFER_ENCODING).map(|v| v.as_bytes().eq_ignore_ascii_case(b"chunked")).unwrap_or(false) {
200 Self::send_error(&mut stream, StatusCode::NOT_IMPLEMENTED, "Chunked transfer encoding not supported")?;
201 return Ok(());
202 }
203
204 keep_alive = match (temp_request.version, temp_request.headers.get(http::header::CONNECTION)) {
208 (http::Version::HTTP_11, Some(v)) if v.as_bytes().eq_ignore_ascii_case(b"close") => false,
209 (http::Version::HTTP_11, _) => true,
210 _ => false,
211 };
212
213 let content_length = temp_request.headers.get(http::header::CONTENT_LENGTH).and_then(|v| v.to_str().ok()).and_then(|v| v.parse::<usize>().ok()).unwrap_or(0);
218
219 if content_length > config.max_body_size {
220 Self::send_error(&mut stream, StatusCode::PAYLOAD_TOO_LARGE, "Request body too large")?;
221 return Ok(());
222 }
223
224 if body.len() > content_length {
226 pipeline_buffer = body.split_off(content_length);
227 }
228
229 while body.len() < content_length {
230 let n = stream.read(&mut temp)?;
231 if n == 0 {
232 Self::send_error(&mut stream, StatusCode::BAD_REQUEST, "Unexpected EOF while reading request body")?;
233 return Ok(());
234 }
235
236 body.extend_from_slice(&temp[..n]);
237 }
238 if body.len() > content_length {
239 pipeline_buffer = body.split_off(content_length);
240 }
241
242 let request = match Request::parse(headers_raw, Bytes::from(body), remote_addr) {
246 Ok(r) => r,
247 Err(e) => {
248 Self::send_error(&mut stream, StatusCode::BAD_REQUEST, &format!("Invalid request: {}", e))?;
249 return Ok(());
250 }
251 };
252
253 let result = service.handle(request, None);
257
258 match result {
259 Ok(ServiceResult::Response(response)) => {
260 let raw = response.to_raw();
261 stream.write_all(&raw)?;
262 stream.flush()?;
263 if !keep_alive {
264 return Ok(());
265 }
266 if let Some(conn) = response.headers.get(http::header::CONNECTION) {
267 if conn.as_bytes().eq_ignore_ascii_case(b"close") {
268 return Ok(());
269 }
270 }
271 }
272
273 Ok(ServiceResult::Consumed) => return Ok(()),
274
275 Err(e) => {
276 Self::send_error(&mut stream, StatusCode::INTERNAL_SERVER_ERROR, &format!("Internal error: {}", e))?;
277 return Ok(());
278 }
279 }
280 }
281
282 Ok(())
283 }
284}