pillow_routing/
server.rs

1use std::{
2    net::SocketAddr,
3    sync::{atomic::AtomicBool, Arc},
4};
5
6use pillow_http::Request;
7
8use tokio::{
9    io::{AsyncWriteExt, Interest},
10    net::TcpListener,
11    sync::watch,
12};
13
14use crate::MainRouter;
15
16/// Server for you app
17#[derive(Debug)]
18pub struct Server {
19    state: watch::Sender<State>,
20    /// Address
21    addr: [u8; 4],
22
23    /// Port on listen
24    port: u16,
25
26    socket_addr: SocketAddr,
27
28    listener: TcpListener,
29
30    shutdown: Arc<AtomicBool>,
31}
32
33#[derive(Debug, PartialEq, Eq)]
34pub(crate) enum State {
35    Starting,
36    Listening,
37    Shutdown,
38}
39
40impl std::default::Default for Server {
41    /// Instance of Server
42    ///
43    /// # Examples
44    ///
45    /// ```rust
46    /// #[tokio::main]
47    /// async fn main(){
48    ///     let server = Server::default();
49    /// }
50    /// ```
51    fn default() -> Self {
52        Self::new(3000).unwrap()
53    }
54}
55
56impl Server {
57    /// Instance of Server
58    ///
59    /// # Arguments
60    ///
61    /// * port - port of your app
62    ///
63    /// # Examples
64    ///
65    /// ```rust
66    /// #[tokio::main]
67    /// async fn main(){
68    ///     let server = Server::new(3000).unwrap();
69    /// }
70    /// ```   
71    pub fn new(port: u16) -> Result<Self, std::io::Error> {
72        let addr = [127, 0, 0, 1];
73
74        let (state, _) = watch::channel(State::Starting);
75        let socket_addr = SocketAddr::from((addr, port.try_into().unwrap()));
76
77        let socket = tokio::net::TcpSocket::new_v4()?;
78
79        #[cfg(not(win))]
80        socket.set_reuseaddr(true)?;
81
82        match socket.bind(socket_addr) {
83            Ok(_) => {}
84            Err(_) => {
85                let socket_addr = SocketAddr::from((addr, port + 1));
86                socket.bind(socket_addr).unwrap();
87            }
88        };
89
90        let listener = socket.listen(1024)?;
91
92        let shutdown = Arc::new(AtomicBool::new(false));
93
94        Ok(Self {
95            state,
96            addr,
97            port,
98            socket_addr,
99            listener,
100            shutdown,
101        })
102    }
103
104    /// Reference of add
105    pub fn addr(&self) -> &[u8; 4] {
106        &self.addr
107    }
108
109    /// Reference of port
110    pub fn port(&self) -> &u16 {
111        &self.port
112    }
113
114    /// Reference of socket_addr
115    pub fn socket_addr(&self) -> &SocketAddr {
116        &self.socket_addr
117    }
118}
119
120impl Server {
121    /// Run you Server
122    ///
123    /// # Arguments
124    ///
125    /// * router - You MainRouter
126    ///
127    /// # Examples
128    ///
129    /// ```rust
130    /// #[tokio::main]
131    /// async main(){
132    ///     let mut router = MainRouter::new();
133    ///     let server = Server::default();
134    ///     server.run(router).await;
135    /// }
136    /// ```
137    pub async fn run(self, router: MainRouter) {
138        self.state.send_replace(State::Listening);
139
140        println!("Listening on http://{}/", &self.socket_addr);
141
142        let router = Arc::new(router);
143
144        let listener = Listener::new(self.listener, router);
145
146        listener.listen().await.unwrap();
147    }
148}
149
150/// Listener http
151struct Listener {
152    listener: TcpListener,
153    router: Arc<MainRouter>,
154}
155
156impl Listener {
157    /// Instance of Listener
158    ///
159    /// # Arguments
160    ///
161    /// * listener - TcpListener
162    pub fn new(listener: TcpListener, router: Arc<MainRouter>) -> Self {
163        Self { listener, router }
164    }
165}
166
167impl Listener {
168    /// Listen Listener
169    pub async fn listen<'a, 'b>(
170        &'a self,
171    ) -> Result<(), Box<dyn std::error::Error + Send + Sync + 'a>> {
172        loop {
173            match self.listener.accept().await {
174                Ok((mut stream, _client)) => {
175                    let router_clone = self.router.clone();
176
177                    tokio::task::spawn(async move {
178                        if let Err(err) = Self::handle_connections(&mut stream, &router_clone).await
179                        {
180                            eprintln!("{}", err);
181                        };
182                    });
183                }
184                Err(err) => eprintln!("{}", err),
185            };
186        }
187    }
188
189    /// Handle new connections
190    ///
191    /// # Arguments
192    ///
193    /// * stream - &TcpStream
194    /// * router - &MainRouter
195    async fn handle_connections(
196        stream: &mut tokio::net::TcpStream,
197        router: &MainRouter,
198    ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
199        let ready_readable = stream.ready(Interest::READABLE).await?;
200        let ready_writable = stream.ready(Interest::WRITABLE).await?;
201
202        let mut request: Request = Request::new_empty();
203
204        if ready_readable.is_readable() {
205            request = Self::read_stream(request, &stream);
206        };
207
208        if ready_writable.is_writable() {
209            Self::write_stream(stream, &mut request, &router).await?;
210            stream.flush().await?;
211        };
212
213        Ok(())
214    }
215
216    /// Write data in stream
217    ///
218    /// # Arguments
219    ///
220    /// * stream - tokio TcpStream
221    /// * request - for Router
222    /// * router - MainRouter
223    async fn write_stream(
224        stream: &mut tokio::net::TcpStream,
225        request: &Request,
226        router: &MainRouter,
227    ) -> Result<(), std::io::Error> {
228        let vec_response = router.routing(request);
229
230        for response in vec_response {
231            let headers = format!(
232                "{}{}\r\n\r\n",
233                response.get_status_line(),
234                response.get_headers()
235            );
236            let body = response.get_body();
237
238            stream.write_all(headers.as_bytes()).await?;
239            stream.write_all(body.as_bytes()).await?;
240        }
241        Ok(())
242    }
243
244    /// Read data from stream and return a Request
245    fn read_stream(mut _parser: Request, stream: &tokio::net::TcpStream) -> Request {
246        let mut data = vec![0; 1024];
247        // Try to read data, this may still fail with `WouldBlock`
248        // if the readiness event is a false positive.
249        match stream.try_read(&mut data) {
250            Ok(_) => {
251                _parser = Request::from_vec(&data).unwrap();
252
253                return _parser;
254            }
255            Err(e) => {
256                panic!("{}", e);
257            }
258        };
259    }
260}
261
262impl Listener {
263    /// Monitor the shutdown signal and close the listener when received
264    async fn monitor_shutdown(mut shutdown_rx: watch::Receiver<State>) {
265        // Wait for the shutdown signal
266        let _ = shutdown_rx.changed().await;
267    }
268
269    /// Wait for the listener to finish
270    async fn await_shutdown(self) -> Result<(), std::io::Error> {
271        Ok(())
272    }
273}