feather_runtime/runtime/
engine.rs1use crate::http::{Request, Response};
2use crate::utils::{Message, Queue};
3use std::io::{self, BufReader, BufWriter, Read, Result as IoResult, Write};
4use std::cell::RefCell;
5use std::net::{Ipv4Addr, ToSocketAddrs};
6use std::sync::Arc;
7use std::sync::atomic::{AtomicBool, Ordering};
8use may::go;
9use std::net::TcpListener;
10use std::net::SocketAddr;
11use socket2::{Socket, Domain, Type, Protocol};
12use may::Config;
13
14pub struct EngineConfig {
15 pub address: (Ipv4Addr, u16),
17}
18
19pub struct Engine {
22 listener: TcpListener,
23 messages: Arc<Queue<Message>>,
24 shutdown_flag: Arc<AtomicBool>,
25}
26
27impl Engine {
28 pub fn new(addr: impl ToSocketAddrs) -> Self {
30 Config.set_pool_capacity(200);
32 Config.set_stack_size(256 * 1024);
33
34 let addr = addr.to_socket_addrs().unwrap().next().expect("Invalid address");
36 let domain = match addr {
37 SocketAddr::V4(_) => Domain::IPV4,
38 SocketAddr::V6(_) => Domain::IPV6,
39 };
40 let socket = Socket::new(domain, Type::STREAM, Some(Protocol::TCP)).expect("Failed to create socket");
41 socket.set_reuse_address(true).ok();
42 #[cfg(unix)]
43 socket.set_reuse_port(true).ok();
44 socket.set_nodelay(true).ok();
45 socket.set_recv_buffer_size(1 << 20).ok(); socket.set_send_buffer_size(1 << 20).ok(); let backlog = 2048;
48 socket.bind(&addr.into()).expect("Failed to bind socket");
49 socket.listen(backlog).expect("Failed to listen on socket");
50 let listener: TcpListener = socket.into();
51 listener.set_nonblocking(true).expect("Failed to set nonblocking");
52
53 let messages = Queue::with_capacity(256);
54 let shutdown_flag = Arc::new(AtomicBool::new(false));
55 let server = Self {
56 listener,
57 messages: Arc::new(messages),
58 shutdown_flag,
59 };
60 server
61 }
62 pub fn with_config(config: EngineConfig) -> Self {
64 let addr = SocketAddr::from(config.address);
66 let domain = match addr {
67 SocketAddr::V4(_) => Domain::IPV4,
68 SocketAddr::V6(_) => Domain::IPV6,
69 };
70 let socket = Socket::new(domain, Type::STREAM, Some(Protocol::TCP)).expect("Failed to create socket");
71 socket.set_reuse_address(true).ok();
72 #[cfg(unix)]
73 socket.set_reuse_port(true).ok();
74 socket.set_nodelay(true).ok();
75 socket.set_recv_buffer_size(1 << 20).ok();
76 socket.set_send_buffer_size(1 << 20).ok();
77 let backlog = 2048;
78 socket.bind(&addr.into()).expect("Failed to bind socket");
79 socket.listen(backlog).expect("Failed to listen on socket");
80 let listener: TcpListener = socket.into();
81 listener.set_nonblocking(true).expect("Failed to set nonblocking");
82
83 let messages = Queue::with_capacity(500);
84 let shutdown_flag = Arc::new(AtomicBool::new(false));
85 let server = Self {
86 listener,
87 messages: Arc::new(messages),
88 shutdown_flag,
89 };
90 server
91 }
92 pub fn spawn(&self, task: impl FnOnce() + Send + 'static) {
94 go!(task);
95 }
96 pub fn shutdown(&self) {
102 self.messages.unblock();
103 self.shutdown_flag.store(true, Ordering::Relaxed);
104 }
105
106 pub fn start(&self) {
110 let inside_closer = self.shutdown_flag.clone();
111 let inside_queue = self.messages.clone();
112 let server = self.listener.try_clone().unwrap();
113 go!(move || {
115 #[cfg(feature = "log")]
116 log::debug!("Acceptor thread started");
117
118 use std::thread::sleep;
119 use std::time::Duration;
120
121 thread_local! {
123 static BUFFER_POOL: RefCell<Vec<u8>> = RefCell::new(vec![0u8; 4096]);
124 }
125 while !inside_closer.load(Ordering::Relaxed) {
126 match server.accept() {
127 Ok((stream, _)) => {
128 let _ = stream.set_nodelay(true);
130 let _ = stream.set_read_timeout(Some(Duration::from_secs(25)));
131 let _ = stream.set_write_timeout(Some(Duration::from_secs(25)));
132 let inside_queue = inside_queue.clone();
133 go!(move || {
134 let mut buf_reader = BufReader::with_capacity(4096, stream.try_clone().unwrap());
135 BUFFER_POOL.with(|buf_cell| {
136 let mut buffer = buf_cell.borrow_mut();
137 loop {
138 buffer.fill(0);
139
140 match buf_reader.read(&mut buffer[..]) {
141 Ok(0) => {
142 break;
144 }
145 Ok(n) => {
146 if let Ok(mut request) = Request::parse(&buffer[..n]) {
147
148 if let Ok(write_socket) = stream.try_clone() {
149 request.set_stream(write_socket);
150 }
151 inside_queue.push(Message::Request(request))
152 }
153 }
154 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
155 #[cfg(feature = "log")]
156 log::debug!("Connection timed out");
157 let _ = stream.shutdown(std::net::Shutdown::Both);
158 break;
159 }
160 Err(ref e) if e.kind() == io::ErrorKind::ConnectionReset => {
161 #[cfg(feature = "log")]
162 log::debug!("Connection reset by peer");
163 let _ = stream.shutdown(std::net::Shutdown::Both);
164 break;
165 }
166 Err(e) => {
167 #[cfg(feature = "log")]
168 log::debug!("Error reading stream: {}", e);
169 break;
170 }
171 }
172 }
173 });
174 });
175 }
176 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
177 sleep(Duration::from_millis(1));
179 continue;
180 }
181 Err(e) => {
182 #[cfg(feature = "log")]
183 log::debug!("Error accepting connection: {}", e);
184 sleep(Duration::from_millis(1));
185 continue;
186 }
187 }
188 }
189 #[cfg(feature = "log")]
190 log::debug!("Acceptor thread shutting down");
191 });
192 }
193
194 pub fn recv(&self) -> IoResult<Request> {
198 match self.messages.pop() {
199 Some(Message::Error(e)) => Err(e),
200 Some(Message::Request(r)) => Ok(r),
201 None => Err(io::Error::new(io::ErrorKind::Other, "No message available")),
202 }
203 }
204 pub fn address(&self) -> String {
206 self.listener.local_addr().unwrap().to_string()
207 }
208 pub fn unblock(&self) {
211 self.messages.unblock();
212 }
213
214 pub fn for_each<F>(self, mut handle: F) -> io::Result<()>
219 where
220 F: FnMut(&mut Request) -> Response,
221 {
222 let engine = self;
223
224 loop {
225
226 match engine.recv() {
227 Ok(mut request) => {
228
229 let is_close = matches!(request.connection, Some(crate::http::ConnectionState::Close));
230 let connection_header = if is_close { "close" } else { "keep-alive" };
231 let mut response = handle(&mut request);
232 response.add_header("connection", connection_header);
233 if let Some(mut stream) = request.take_stream() {
234 let mut writer = BufWriter::new(&mut stream);
235 writer.write_all(&response.to_raw())?;
236 match writer.flush() {
237 Ok(_) => {}
238 Err(e) if e.kind() == io::ErrorKind::BrokenPipe => {
239 #[cfg(feature = "log")]
240 log::debug!("Client disconnected");
241 continue;
242 }
243 Err(e) => {
244 #[cfg(feature = "log")]
245 log::debug!("Error writing response: {}", e);
246 break;
247 }
248 };
249 if is_close {
250 break;
251 }
252 }
253 }
254 Err(e) => {
255 #[cfg(feature = "log")]
256 log::debug!("Error receiving message: {}", e);
257 break;
258 }
259 }
260 }
261 Ok(())
262 }
263}
264
265impl Drop for Engine {
266 fn drop(&mut self) {
267 self.shutdown();
268 }
269}