feather_runtime/runtime/
engine.rs

1use crate::http::{Request, Response};
2use crate::utils::{Message, Queue};
3use std::io::{self, BufReader, BufWriter, Read, Result as IoResult, Write};
4use std::cell::RefCell;
5use std::net::{Ipv4Addr, ToSocketAddrs};
6use std::sync::Arc;
7use std::sync::atomic::{AtomicBool, Ordering};
8use may::go;
9use std::net::TcpListener;
10use std::net::SocketAddr;
11use socket2::{Socket, Domain, Type, Protocol};
12use may::Config;
13
14pub struct EngineConfig {
15    /// The Address of the Engine(IP,Port)
16    pub address: (Ipv4Addr, u16),
17}
18
19/// The Engine is the main struct of the Runtime.  
20/// It drives the runtime Accepting Requests Answering them etc.
21pub struct Engine {
22    listener: TcpListener,
23    messages: Arc<Queue<Message>>,
24    shutdown_flag: Arc<AtomicBool>,
25}
26
27impl Engine {
28    /// Creates a new `Engine` instance without a config
29    pub fn new(addr: impl ToSocketAddrs) -> Self {
30        // Bigger Pools for better Concurency
31        Config.set_pool_capacity(200);
32        Config.set_stack_size(256 * 1024);
33        
34        // Advanced socket2 usage for tuning
35        let addr = addr.to_socket_addrs().unwrap().next().expect("Invalid address");
36        let domain = match addr {
37            SocketAddr::V4(_) => Domain::IPV4,
38            SocketAddr::V6(_) => Domain::IPV6,
39        };
40        let socket = Socket::new(domain, Type::STREAM, Some(Protocol::TCP)).expect("Failed to create socket");
41        socket.set_reuse_address(true).ok();
42        #[cfg(unix)]
43        socket.set_reuse_port(true).ok();
44        socket.set_nodelay(true).ok();
45        socket.set_recv_buffer_size(1 << 20).ok(); // 1MB
46        socket.set_send_buffer_size(1 << 20).ok(); // 1MB
47        let backlog = 2048;
48        socket.bind(&addr.into()).expect("Failed to bind socket");
49        socket.listen(backlog).expect("Failed to listen on socket");
50        let listener: TcpListener = socket.into();
51        listener.set_nonblocking(true).expect("Failed to set nonblocking");
52
53        let messages = Queue::with_capacity(256); 
54        let shutdown_flag = Arc::new(AtomicBool::new(false));
55        let server = Self {
56            listener,
57            messages: Arc::new(messages),
58            shutdown_flag,
59        };
60        server
61    }
62    /// Creates a new `Engine` instance with a config
63    pub fn with_config(config: EngineConfig) -> Self {
64        // Advanced socket2 usage for tuning
65        let addr = SocketAddr::from(config.address);
66        let domain = match addr {
67            SocketAddr::V4(_) => Domain::IPV4,
68            SocketAddr::V6(_) => Domain::IPV6,
69        };
70        let socket = Socket::new(domain, Type::STREAM, Some(Protocol::TCP)).expect("Failed to create socket");
71        socket.set_reuse_address(true).ok();
72        #[cfg(unix)]
73        socket.set_reuse_port(true).ok();
74        socket.set_nodelay(true).ok();
75        socket.set_recv_buffer_size(1 << 20).ok();
76        socket.set_send_buffer_size(1 << 20).ok();
77        let backlog = 2048;
78        socket.bind(&addr.into()).expect("Failed to bind socket");
79        socket.listen(backlog).expect("Failed to listen on socket");
80        let listener: TcpListener = socket.into();
81        listener.set_nonblocking(true).expect("Failed to set nonblocking");
82
83        let messages = Queue::with_capacity(500);
84        let shutdown_flag = Arc::new(AtomicBool::new(false));
85        let server = Self {
86            listener,
87            messages: Arc::new(messages),
88            shutdown_flag,
89        };
90        server
91    }
92    /// Add a new task to the internal TaskPool works like `thread::spawn` but its managed bythe Engine
93    pub fn spawn(&self, task: impl FnOnce() + Send + 'static) {
94        go!(task);
95    }
96    /// Trigger the shutdown flag to stop the Engine.
97    /// This method will unblock the thread that is waiting for a message.
98    /// It will also stop the acceptor thread.
99    /// This method should be called when the Engine is shutting down.
100    /// Its Called when the Engine is dropped.
101    pub fn shutdown(&self) {
102        self.messages.unblock();
103        self.shutdown_flag.store(true, Ordering::Relaxed);
104    }
105
106    /// Starts Acceptor thread.
107    /// This thread will accept incoming connections and push them to the queue.
108    /// The thread will run until the Engine is shutdown.
109    pub fn start(&self) {
110        let inside_closer = self.shutdown_flag.clone();
111        let inside_queue = self.messages.clone();
112        let server = self.listener.try_clone().unwrap();
113        // Start the Acceptor thread
114        go!(move || {
115            #[cfg(feature = "log")]
116            log::debug!("Acceptor thread started");
117
118            use std::thread::sleep;
119            use std::time::Duration;
120
121            //* Thread-local buffer pool for connection handlers
122            thread_local! {
123                static BUFFER_POOL: RefCell<Vec<u8>> = RefCell::new(vec![0u8; 4096]);
124            }
125            while !inside_closer.load(Ordering::Relaxed) {
126                match server.accept() {
127                    Ok((stream, _)) => {
128                        // Socket tuning: TCP_NODELAY, buffer size, timeout
129                        let _ = stream.set_nodelay(true);
130                        let _ = stream.set_read_timeout(Some(Duration::from_secs(25)));
131                        let _ = stream.set_write_timeout(Some(Duration::from_secs(25)));
132                        let inside_queue = inside_queue.clone();
133                        go!(move || {
134                            let mut buf_reader = BufReader::with_capacity(4096, stream.try_clone().unwrap());
135                            BUFFER_POOL.with(|buf_cell| {
136                                let mut buffer = buf_cell.borrow_mut();
137                                loop {
138                                    buffer.fill(0);
139                                    
140                                    match buf_reader.read(&mut buffer[..]) {
141                                        Ok(0) => {
142                                            // Connection closed
143                                            break;
144                                        }
145                                        Ok(n) => {
146                                            if let Ok(mut request) = Request::parse(&buffer[..n]) {
147                                                
148                                                if let Ok(write_socket) = stream.try_clone() {
149                                                    request.set_stream(write_socket);
150                                                }
151                                                inside_queue.push(Message::Request(request))
152                                            }
153                                        }
154                                        Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
155                                            #[cfg(feature = "log")]
156                                            log::debug!("Connection timed out");
157                                            let _ = stream.shutdown(std::net::Shutdown::Both);
158                                            break;
159                                        }
160                                        Err(ref e) if e.kind() == io::ErrorKind::ConnectionReset => {
161                                            #[cfg(feature = "log")]
162                                            log::debug!("Connection reset by peer");
163                                            let _ = stream.shutdown(std::net::Shutdown::Both);
164                                            break;
165                                        }
166                                        Err(e) => {
167                                            #[cfg(feature = "log")]
168                                            log::debug!("Error reading stream: {}", e);
169                                            break;
170                                        }
171                                    }
172                                }
173                            });
174                        });
175                    }
176                    Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
177                        //? Non-blocking accept: avoid busy-wait 
178                        sleep(Duration::from_millis(1));
179                        continue;
180                    }
181                    Err(e) => {
182                        #[cfg(feature = "log")]
183                        log::debug!("Error accepting connection: {}", e);
184                        sleep(Duration::from_millis(1));
185                        continue;
186                    }
187                }
188            }
189            #[cfg(feature = "log")]
190            log::debug!("Acceptor thread shutting down");
191        });
192    }
193
194    /// Blocks until a message is available to receive.
195    /// If the queue is empty, it will wait until a message is available.
196    /// If the queue is unblocked, it will return an error.
197    pub fn recv(&self) -> IoResult<Request> {
198        match self.messages.pop() {
199            Some(Message::Error(e)) => Err(e),
200            Some(Message::Request(r)) => Ok(r),
201            None => Err(io::Error::new(io::ErrorKind::Other, "No message available")),
202        }
203    }
204    /// Returns the address the Engine is Bound to.  
205    pub fn address(&self) -> String {
206        self.listener.local_addr().unwrap().to_string()
207    }
208    /// Unblocks the thread that is waiting for a message.
209    /// this medhod allows graceful shutdown of the Engine's Runtime.
210    pub fn unblock(&self) {
211        self.messages.unblock();
212    }
213
214    /// Iterates over incoming requests and handles them using the provided closure.
215    /// The closure should take a `HttpRequest` and return a `HttpResponse`.
216    /// This method will block until a request is available.
217    /// It will also handle the response and write it to the stream.
218    pub fn for_each<F>(self, mut handle: F) -> io::Result<()>
219    where
220        F: FnMut(&mut Request) -> Response,
221    {
222        let engine = self;
223        
224        loop {
225            
226            match engine.recv() {
227                Ok(mut request) => {
228                   
229                    let is_close = matches!(request.connection, Some(crate::http::ConnectionState::Close));
230                    let connection_header = if is_close { "close" } else { "keep-alive" };
231                    let mut response = handle(&mut request);
232                    response.add_header("connection", connection_header);
233                    if let Some(mut stream) = request.take_stream() {
234                        let mut writer = BufWriter::new(&mut stream);
235                        writer.write_all(&response.to_raw())?;
236                        match writer.flush() {
237                            Ok(_) => {}
238                            Err(e) if e.kind() == io::ErrorKind::BrokenPipe => {
239                                #[cfg(feature = "log")]
240                                log::debug!("Client disconnected");
241                                continue;
242                            }
243                            Err(e) => {
244                                #[cfg(feature = "log")]
245                                log::debug!("Error writing response: {}", e);
246                                break;
247                            }
248                        };
249                        if is_close {
250                            break;
251                        }
252                    }
253                }
254                Err(e) => {
255                    #[cfg(feature = "log")]
256                    log::debug!("Error receiving message: {}", e);
257                    break;
258                }
259            }
260        }
261        Ok(())
262    }
263}
264
265impl Drop for Engine {
266    fn drop(&mut self) {
267        self.shutdown();
268    }
269}