feather_runtime/runtime/
server.rs1use http::StatusCode;
2#[cfg(feature = "log")]
3use log::{debug, info, warn};
4use may::net::{TcpListener, TcpStream};
5use num_cpus;
6use std::io::{self, Read, Write};
7use std::net::{SocketAddr, ToSocketAddrs};
8use std::sync::atomic::{AtomicBool, Ordering};
9use std::{panic, sync::Arc};
10
11use crate::http::{Request, Response};
12use crate::runtime::service::{ArcService, Service, ServiceResult};
13
14#[derive(Clone, Debug)]
16pub struct ServerConfig {
17 pub max_body_size: usize,
19 pub read_timeout_secs: u64,
21 pub workers: usize,
23 pub stack_size: usize,
25}
26
27impl Default for ServerConfig {
28 fn default() -> Self {
29 Self {
30 max_body_size: 8192,
31 read_timeout_secs: 30,
32 workers: num_cpus::get(),
33 stack_size: 64 * 1024,
34 }
35 }
36}
37
38pub struct Server {
40 service: ArcService,
42 running: Arc<AtomicBool>,
44 config: ServerConfig,
46}
47
48impl Server {
49 pub fn new(service: impl Service, max_body_size: usize) -> Self {
51 let mut config = ServerConfig::default();
52 config.max_body_size = max_body_size;
53 Self {
54 service: Arc::new(service),
55 running: Arc::new(AtomicBool::new(true)),
56 config,
57 }
58 }
59
60 pub fn with_config(service: impl Service, config: ServerConfig) -> Self {
62 Self {
63 service: Arc::new(service),
64 running: Arc::new(AtomicBool::new(true)),
65 config,
66 }
67 }
68
69 pub fn shutdown(&self) {
71 self.running.store(false, Ordering::SeqCst);
72 }
73
74 pub fn run(&self, addr: impl ToSocketAddrs) -> io::Result<()> {
76 may::config().set_workers(self.config.workers);
78 may::config().set_stack_size(self.config.stack_size);
79 #[cfg(feature = "log")]
80 info!(
81 "Feather Runtime Started on {}",
82 addr.to_socket_addrs()?.next().unwrap_or(SocketAddr::from(([0, 0, 0, 0], 80)))
83 );
84
85 let listener = TcpListener::bind(addr)?;
86
87 while self.running.load(Ordering::SeqCst) {
88 match listener.accept() {
89 Ok((stream, addr)) => {
90 #[cfg(feature = "log")]
91 debug!("New connection from {}", addr);
92 let service = self.service.clone();
93 let config = self.config.clone();
94
95 may::go!(move || {
97 let result = panic::catch_unwind(panic::AssertUnwindSafe(|| Self::conn_handler(stream, service, config)));
98
99 match result {
100 Ok(Ok(())) => (), Ok(Err(e)) => {
102 #[cfg(feature = "log")]
103 log::error!("Connection handler error: {}", e);
104 }
105 Err(e) => {
106 let msg = e.downcast_ref::<String>().map(|s| s.as_str()).unwrap_or("Unknown panic");
107 #[cfg(feature = "log")]
108 log::error!("Connection handler panic: {}", msg);
109 }
110 }
111 });
112 }
113 Err(e) => {
114 #[cfg(feature = "log")]
115 warn!("Failed to accept connection: {}", e);
116 }
117 }
118 }
119
120 #[cfg(feature = "log")]
121 info!("Server shutting down");
122 Ok(())
123 }
124
125 fn send_error(stream: &mut TcpStream, status: StatusCode, message: &str) -> io::Result<()> {
127 let mut response = Response::default();
128 response.set_status(status.as_u16());
129 response.send_text(message);
130
131 response.add_header("X-Content-Type-Options", "nosniff").ok();
133 response.add_header("X-Frame-Options", "DENY").ok();
134
135 response.add_header("Connection", "close").ok();
137
138 stream.write_all(&response.to_raw())
139 }
140 fn conn_handler(mut stream: TcpStream, service: ArcService, config: ServerConfig) -> io::Result<()> {
142 let mut buffer = vec![0u8; config.max_body_size];
144 let mut keep_alive = true;
145
146 while keep_alive {
147 stream.set_read_timeout(Some(std::time::Duration::from_secs(config.read_timeout_secs)))?;
149 let bytes_read = match stream.read(&mut buffer) {
150 Ok(0) => return Ok(()), Ok(n) if n >= config.max_body_size => {
152 Self::send_error(&mut stream, StatusCode::PAYLOAD_TOO_LARGE, "Request body too large")?;
153 return Ok(());
154 }
155 Ok(n) => n,
156 Err(e) => {
157 if e.kind() == io::ErrorKind::WouldBlock {
158 Self::send_error(&mut stream, StatusCode::REQUEST_TIMEOUT, "Request timed out")?;
159 }
160 return Err(e);
161 }
162 };
163
164 let request = match Request::parse(&buffer[..bytes_read]) {
166 Ok(req) => {
167 keep_alive = match (req.version, req.headers.get(http::header::CONNECTION)) {
169 (http::Version::HTTP_11, Some(v)) => v.as_bytes().eq_ignore_ascii_case(b"keep-alive"),
170 (http::Version::HTTP_11, None) => true, _ => false, };
173 req
174 }
175 Err(e) => {
176 Self::send_error(&mut stream, StatusCode::BAD_REQUEST, &format!("Invalid request: {}", e))?;
177 return Ok(());
178 }
179 };
180
181 let result = service.handle(request, None);
184
185 match result {
187 Ok(ServiceResult::Response(response)) => {
188 let raw_response = response.to_raw();
195 stream.write_all(&raw_response)?;
196 stream.flush()?;
197
198 if let Some(connection) = response.headers.get(http::header::CONNECTION) {
201 if connection.as_bytes().eq_ignore_ascii_case(b"close") {
202 return Ok(());
203 }
204 }
205
206 }
211
212 Ok(ServiceResult::Consumed) => {
213 return Ok(());
214 }
215
216 Err(e) => {
217 Self::send_error(&mut stream, http::StatusCode::INTERNAL_SERVER_ERROR, &format!("Internal error: {}", e))?;
218 return Ok(());
219 }
220 }
221
222 }
226 Ok(())
227 }
228}