1use std::{
2 net::SocketAddr,
3 sync::{atomic::AtomicBool, Arc},
4};
5
6use pillow_http::Request;
7
8use tokio::{
9 io::{AsyncWriteExt, Interest},
10 net::TcpListener,
11 sync::watch,
12};
13
14use crate::MainRouter;
15
16#[derive(Debug)]
18pub struct Server {
19 state: watch::Sender<State>,
20 addr: [u8; 4],
22
23 port: u16,
25
26 socket_addr: SocketAddr,
27
28 listener: TcpListener,
29
30 shutdown: Arc<AtomicBool>,
31}
32
33#[derive(Debug, PartialEq, Eq)]
34pub(crate) enum State {
35 Starting,
36 Listening,
37 Shutdown,
38}
39
40impl std::default::Default for Server {
41 fn default() -> Self {
52 Self::new(3000).unwrap()
53 }
54}
55
56impl Server {
57 pub fn new(port: u16) -> Result<Self, std::io::Error> {
72 let addr = [127, 0, 0, 1];
73
74 let (state, _) = watch::channel(State::Starting);
75 let socket_addr = SocketAddr::from((addr, port.try_into().unwrap()));
76
77 let socket = tokio::net::TcpSocket::new_v4()?;
78
79 #[cfg(not(win))]
80 socket.set_reuseaddr(true)?;
81
82 match socket.bind(socket_addr) {
83 Ok(_) => {}
84 Err(_) => {
85 let socket_addr = SocketAddr::from((addr, port + 1));
86 socket.bind(socket_addr).unwrap();
87 }
88 };
89
90 let listener = socket.listen(1024)?;
91
92 let shutdown = Arc::new(AtomicBool::new(false));
93
94 Ok(Self {
95 state,
96 addr,
97 port,
98 socket_addr,
99 listener,
100 shutdown,
101 })
102 }
103
104 pub fn addr(&self) -> &[u8; 4] {
106 &self.addr
107 }
108
109 pub fn port(&self) -> &u16 {
111 &self.port
112 }
113
114 pub fn socket_addr(&self) -> &SocketAddr {
116 &self.socket_addr
117 }
118}
119
120impl Server {
121 pub async fn run(self, router: MainRouter) {
138 self.state.send_replace(State::Listening);
139
140 println!("Listening on http://{}/", &self.socket_addr);
141
142 let router = Arc::new(router);
143
144 let listener = Listener::new(self.listener, router);
145
146 listener.listen().await.unwrap();
147 }
148}
149
150struct Listener {
152 listener: TcpListener,
153 router: Arc<MainRouter>,
154}
155
156impl Listener {
157 pub fn new(listener: TcpListener, router: Arc<MainRouter>) -> Self {
163 Self { listener, router }
164 }
165}
166
167impl Listener {
168 pub async fn listen<'a, 'b>(
170 &'a self,
171 ) -> Result<(), Box<dyn std::error::Error + Send + Sync + 'a>> {
172 loop {
173 match self.listener.accept().await {
174 Ok((mut stream, _client)) => {
175 let router_clone = self.router.clone();
176
177 tokio::task::spawn(async move {
178 if let Err(err) = Self::handle_connections(&mut stream, &router_clone).await
179 {
180 eprintln!("{}", err);
181 };
182 });
183 }
184 Err(err) => eprintln!("{}", err),
185 };
186 }
187 }
188
189 async fn handle_connections(
196 stream: &mut tokio::net::TcpStream,
197 router: &MainRouter,
198 ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
199 let ready_readable = stream.ready(Interest::READABLE).await?;
200 let ready_writable = stream.ready(Interest::WRITABLE).await?;
201
202 let mut request: Request = Request::new_empty();
203
204 if ready_readable.is_readable() {
205 request = Self::read_stream(request, &stream);
206 };
207
208 if ready_writable.is_writable() {
209 Self::write_stream(stream, &mut request, &router).await?;
210 stream.flush().await?;
211 };
212
213 Ok(())
214 }
215
216 async fn write_stream(
224 stream: &mut tokio::net::TcpStream,
225 request: &Request,
226 router: &MainRouter,
227 ) -> Result<(), std::io::Error> {
228 let vec_response = router.routing(request);
229
230 for response in vec_response {
231 let headers = format!(
232 "{}{}\r\n\r\n",
233 response.get_status_line(),
234 response.get_headers()
235 );
236 let body = response.get_body();
237
238 stream.write_all(headers.as_bytes()).await?;
239 stream.write_all(body.as_bytes()).await?;
240 }
241 Ok(())
242 }
243
244 fn read_stream(mut _parser: Request, stream: &tokio::net::TcpStream) -> Request {
246 let mut data = vec![0; 1024];
247 match stream.try_read(&mut data) {
250 Ok(_) => {
251 _parser = Request::from_vec(&data).unwrap();
252
253 return _parser;
254 }
255 Err(e) => {
256 panic!("{}", e);
257 }
258 };
259 }
260}
261
262impl Listener {
263 async fn monitor_shutdown(mut shutdown_rx: watch::Receiver<State>) {
265 let _ = shutdown_rx.changed().await;
267 }
268
269 async fn await_shutdown(self) -> Result<(), std::io::Error> {
271 Ok(())
272 }
273}