1#![feature(iter_intersperse)]
2
3use std::cell::RefCell;
4use std::fmt::Display;
5use std::io::{self, Read, Write};
6use std::net::{TcpListener, TcpStream, SocketAddr};
7use std::rc::Rc;
8use std::sync::atomic::AtomicBool;
9use std::time::Duration;
10use std::error;
11use std::thread::{self, JoinHandle};
12use std::sync::Arc;
13
14pub mod context;
15pub mod cookie;
16pub mod middleware;
17pub mod request;
18pub mod response;
19pub mod router;
20pub mod session;
21pub mod util;
22
23pub use request::Request;
24use request::RequestError;
25pub use response::Response;
26pub use context::Context;
27use middleware::Middleware;
28use router::{Router, Handler};
29use session::SessionManager;
30use util::{strip_for_terminal, code_color};
31use uuid::Uuid;
32
33use chrono::Utc;
34use colored::*;
35use debug_print::debug_eprintln;
36
37#[derive(Debug)]
38pub enum ImmortalError<'a> {
39 Io(io::Error),
41 #[cfg(feature = "threading")]
43 Tpbe(rayon::ThreadPoolBuildError),
44 AcceptError(io::Error),
46 RequestError(RequestError<'a>),
47}
48
49impl Display for ImmortalError<'_> {
50 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
51 write!(f, "{:?}", self)
52 }
53}
54impl error::Error for ImmortalError<'_> {}
55
56#[inline]
57fn log(stream: &TcpStream, req: Rc<RefCell<Request>>, resp: Rc<RefCell<Response>>, sent: usize) {
58 let remote_socket = match stream.peer_addr() {
59 Err(_) => "<no socket>".red().bold(),
60 Ok(s) => s.ip().to_string().normal(),
61 };
62
63 let now = Utc::now();
64 let date_time = now.format("%a, %d %b %Y %H:%M:%S").to_string();
65 let time_stamp = format!("[{:<17}]",
66 format!("{}.{}",
67 now.timestamp(),
68 now.timestamp_subsec_micros())
69 .bright_blue());
70
71 let method = match req.borrow().method {
72 "" => "<no method>".red().bold(),
73 _ => strip_for_terminal(req.borrow().method).normal(),
74 };
75
76 let document = match req.borrow().document {
77 "" => "<no document>".red().bold(),
78 _ => strip_for_terminal(&req.borrow().document).normal(),
79 };
80
81 let user_agent = match req.borrow_mut().header("User-Agent") {
82 None => "<no user-agent>".red().bold(),
83 Some(thing) => strip_for_terminal(thing).normal(),
84 };
85
86 println!("{} {} {} {}\t{} {}\t{}\t{}",
87 date_time,
88 time_stamp,
89 remote_socket,
90 method,
91 code_color(resp.borrow().code),
92 sent,
93 if req.borrow_mut().query_raw.is_empty() {
94 document
95 } else {
96 format!("{}?{}", document, &strip_for_terminal(&req.borrow_mut().query_raw)).normal()
97 },
98 user_agent);
99}
100
101#[inline]
102fn stream_write(stream: &mut TcpStream, request: Rc<RefCell<Request>>, response: Rc<RefCell<Response>>) {
103 let data = response.borrow_mut().serialize();
104 match stream.write(data.as_slice()) {
105 Ok(sent) => {
106 log(&stream, request, response, sent);
107 },
108 Err(_) => {
109 log(&stream, request, response, 0);
110 },
111 }
112}
113
114fn handle_connection(
116 mut stream: TcpStream,
117 session_manager: Arc<SessionManager>,
118 middleware: &Middleware,
119 router: &Router
120) {
121 let peer_addr = match stream.peer_addr() {
122 Ok(addr) => Some(addr),
123 Err(_) => None,
124 };
125 let mut buf: [u8; 4096] = [0; 4096];
126 let read_sz = match stream.read(&mut buf) {
127 Err(_e) => {
128 debug_eprintln!("{}", _e);
129 let _ = stream.shutdown(std::net::Shutdown::Both);
130 return;
131 },
132 Ok(sz) => sz,
133 };
134
135 match read_sz {
136 0 => {
138 let _ = stream.shutdown(std::net::Shutdown::Both);
139 return
140 },
141 _ => {
142 let request = match Request::new(&buf, peer_addr.as_ref()) {
143 Err(RequestError::ProtoVersionInvalid(_)) => {
144 let request = Rc::new(RefCell::new(Request::bad()));
145 let response = Rc::new(RefCell::new(Response::bad()));
146 response.borrow_mut().code = "505";
147 stream_write(&mut stream, request, response);
148 let _ = stream.shutdown(std::net::Shutdown::Both);
149 return;
150 },
151 Err(_) => {
152 let request = Rc::new(RefCell::new(Request::bad()));
153 let response = Rc::new(RefCell::new(Response::bad()));
154 stream_write(&mut stream, request, response);
155 let _ = stream.shutdown(std::net::Shutdown::Both);
156 return;
157 },
158 Ok(req) => req,
159 };
160
161 let request_rc = Rc::new(RefCell::new(request));
162 let mut session_id = Uuid::nil();
163 let response = Response::new(request_rc.clone(), session_manager.clone(), &mut session_id);
164 let response_rc = Rc::new(RefCell::new(response));
165 let mut ctx = Context::new(request_rc.clone(), response_rc.clone(), session_id, session_manager.clone());
166
167 middleware.run(&mut ctx);
168 router.call(&mut ctx);
169
170 stream_write(&mut stream, request_rc, response_rc);
171 },
172 };
173 let _ = stream.shutdown(std::net::Shutdown::Both);
174}
175
176#[derive(Default)]
178pub struct Immortal {
179 middleware: Middleware,
180 router: Router,
181 session_manager: Arc<SessionManager>,
182 #[allow(dead_code)]
183 session_prune_task: Option<(JoinHandle<()>, Arc<AtomicBool>)>,
184}
185
186#[allow(dead_code)]
187impl Immortal {
188 pub fn new() -> Self {
190 Self {
191 middleware: Middleware::new(),
192 router: Router::new(),
193 session_manager: Arc::new(SessionManager::default()),
194 session_prune_task: None,
195 }
196 }
197
198 pub fn listen<S>(
201 &self,
202 socket_addr: S
203 ) -> Result<(), ImmortalError> where S: Into<SocketAddr> {
204 self.listen_with(
205 socket_addr,
206 thread::available_parallelism()
207 .map_err(ImmortalError::Io)?
208 .get(),
209 )
210 }
211
212 pub fn listen_with<S>(
216 &self,
217 socket_addr: S,
218 #[allow(unused_variables)]
219 thread_count: usize
220 ) -> Result<(), ImmortalError> where S: Into<SocketAddr> {
221 let socket_addr: SocketAddr = socket_addr.into();
222 let listener = TcpListener::bind(socket_addr)
223 .map_err(ImmortalError::Io)?;
224
225 println!("Server starting at: http://{socket_addr}");
226
227 #[cfg(feature = "threading")]
228 {
229 let thread_pool = rayon::ThreadPoolBuilder::new()
230 .num_threads(thread_count)
231 .build()
232 .map_err(ImmortalError::Tpbe)?;
233
234 let _ = thread_pool.scope(|scope| -> Result<(), ImmortalError> {
235
236 loop {
237 let (stream, _peer_addr) = listener.accept()
238 .map_err(ImmortalError::AcceptError)?;
239
240 scope.spawn(|_s| {
241 handle_connection(
242 stream,
243 self.session_manager.clone(),
244 &self.middleware,
245 &self.router,
246 );
247 });
248 }
249 });
250 }
251
252 #[cfg(not(feature = "threading"))]
253 loop {
254 let (stream, _peer_addr) = listener.accept()
255 .map_err(ImmortalError::AcceptError)?;
256
257 handle_connection(
258 stream,
259 self.session_manager.clone(),
260 &self.middleware,
261 &self.router,
262 );
263 }
264
265 #[cfg(feature = "threading")]
266 Ok(())
267 }
268
269 pub fn process_buffer(&mut self, request_buffer: &[u8]) -> Vec<u8> {
272 let request = match Request::from_slice(request_buffer) {
273 Err(_) => return Response::bad().serialize(),
274 Ok(req) => req,
275 };
276
277 let request_rc = Rc::new(RefCell::new(request));
278 let mut session_id = Uuid::nil();
279 let response = Response::new(request_rc.clone(), self.session_manager.clone(), &mut session_id);
280 let response_rc = Rc::new(RefCell::new(response));
281 let mut ctx = Context::new(request_rc.clone(), response_rc.clone(), session_id, self.session_manager.clone());
282
283 self.middleware.run(&mut ctx);
284 self.router.call(&mut ctx);
285
286 let data = response_rc.borrow_mut().serialize();
287 data
288 }
289
290 pub fn add_middleware(&mut self, func: Handler) {
296 self.middleware.push(func);
297 }
298
299 pub fn register(&mut self, method: &str, route: &str, func: Handler) -> bool {
302 self.router.register(method, route, func)
303 }
304
305 pub fn unregister(&mut self, method: &str, route: &str) -> bool {
308 self.router.unregister(method, route)
309 }
310
311 pub fn fallback(&mut self, func: Handler) {
314 self.router.fallback = func;
315 }
316
317 pub fn set_session_duration(&self, duration: Duration) {
320 self.session_manager.set_session_duration(duration);
321 }
322
323 pub fn set_inactive_duration(&self, duration: Duration) {
325 self.session_manager.set_inactive_duration(duration);
326 }
327
328 pub fn set_prune_rate(&self, duration: Duration) {
330 self.session_manager.set_prune_rate(duration);
331 }
332
333 pub fn disable_sessions(&mut self) {
335 self.session_manager.disable();
336
337 #[cfg(feature = "threading")]
338 if let Some((handle, stop)) = self.session_prune_task.take() {
339 stop.store(true, std::sync::atomic::Ordering::Relaxed);
340 handle.join().unwrap();
341 }
342 }
343
344 pub fn enable_sessions(&mut self) {
346 self.session_manager.enable();
347
348 #[cfg(feature = "threading")]
349 {
350 let stop = Arc::new(AtomicBool::new(false));
351 let stop_clone = stop.clone();
352 let session_manager_clone = self.session_manager.clone();
353
354 let thread_handle = thread::spawn(|| {
355 session::session_prune_task(
356 session_manager_clone,
357 stop_clone,
358 );
359 });
360
361 self.session_prune_task = Some((thread_handle, stop));
362 }
363 }
364}
365
366impl Drop for Immortal {
367 fn drop(&mut self) {
368 self.disable_sessions();
369 }
370}
371