immortal_http/
lib.rs

1#![feature(iter_intersperse)]
2
3use std::cell::RefCell;
4use std::fmt::Display;
5use std::io::{self, Read, Write};
6use std::net::{TcpListener, TcpStream, SocketAddr};
7use std::rc::Rc;
8use std::sync::atomic::AtomicBool;
9use std::time::Duration;
10use std::error;
11use std::thread::{self, JoinHandle};
12use std::sync::Arc;
13
14pub mod context;
15pub mod cookie;
16pub mod middleware;
17pub mod request;
18pub mod response;
19pub mod router;
20pub mod session;
21pub mod util;
22
23pub use request::Request;
24use request::RequestError;
25pub use response::Response;
26pub use context::Context;
27use middleware::Middleware;
28use router::{Router, Handler};
29use session::SessionManager;
30use util::{strip_for_terminal, code_color};
31use uuid::Uuid;
32
33use chrono::Utc;
34use colored::*;
35use debug_print::debug_eprintln;
36
37#[derive(Debug)]
38pub enum ImmortalError<'a> {
39    /// Io: std::io::Error
40    Io(io::Error),
41    /// Rayon: Thread Pool Build Error
42    #[cfg(feature = "threading")]
43    Tpbe(rayon::ThreadPoolBuildError),
44    /// TCP accept() failed
45    AcceptError(io::Error),
46    RequestError(RequestError<'a>),
47}
48
49impl Display for ImmortalError<'_> {
50    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
51        write!(f, "{:?}", self)
52    }
53}
54impl error::Error for ImmortalError<'_> {}
55
56#[inline]
57fn log(stream: &TcpStream, req: Rc<RefCell<Request>>, resp: Rc<RefCell<Response>>, sent: usize) {
58    let remote_socket = match stream.peer_addr() {
59        Err(_) => "<no socket>".red().bold(),
60        Ok(s) => s.ip().to_string().normal(),
61    };
62
63    let now = Utc::now();
64    let date_time = now.format("%a, %d %b %Y %H:%M:%S").to_string();
65    let time_stamp = format!("[{:<17}]",
66                             format!("{}.{}",
67                                now.timestamp(),
68                                now.timestamp_subsec_micros())
69                            .bright_blue());
70
71    let method = match req.borrow().method {
72        "" => "<no method>".red().bold(),
73        _ => strip_for_terminal(req.borrow().method).normal(),
74    };
75
76    let document = match req.borrow().document {
77        "" => "<no document>".red().bold(),
78        _ => strip_for_terminal(&req.borrow().document).normal(),
79    };
80
81    let user_agent = match req.borrow_mut().header("User-Agent") {
82        None => "<no user-agent>".red().bold(),
83        Some(thing) => strip_for_terminal(thing).normal(),
84    };
85
86    println!("{}  {}  {}  {}\t{}  {}\t{}\t{}",
87             date_time,
88             time_stamp,
89             remote_socket,
90             method,
91             code_color(resp.borrow().code),
92             sent,
93             if req.borrow_mut().query_raw.is_empty() {
94                document
95             } else {
96                format!("{}?{}", document, &strip_for_terminal(&req.borrow_mut().query_raw)).normal()
97             },
98             user_agent);
99}
100
101#[inline]
102fn stream_write(stream: &mut TcpStream, request: Rc<RefCell<Request>>, response: Rc<RefCell<Response>>) {
103    let data = response.borrow_mut().serialize();
104    match stream.write(data.as_slice()) {
105        Ok(sent) => {
106            log(&stream, request, response, sent);
107        },
108        Err(_) => {
109            log(&stream, request, response, 0);
110        },
111    }
112}
113
114/// Reads the TcpStream and handles errors while reading
115fn handle_connection(
116    mut stream: TcpStream,
117    session_manager: Arc<SessionManager>,
118    middleware: &Middleware,
119    router: &Router
120) {
121    let peer_addr = match stream.peer_addr() {
122        Ok(addr) => Some(addr),
123        Err(_) => None,
124    };
125    let mut buf: [u8; 4096] = [0; 4096];
126    let read_sz = match stream.read(&mut buf) {
127        Err(_e) => {
128            debug_eprintln!("{}", _e);
129            let _ = stream.shutdown(std::net::Shutdown::Both);
130            return;
131        },
132        Ok(sz) => sz,
133    };
134
135    match read_sz {
136        //0 => break,
137        0 => {
138            let _ = stream.shutdown(std::net::Shutdown::Both);
139            return
140        },
141        _ => {
142            let request = match Request::new(&buf, peer_addr.as_ref()) {
143                Err(RequestError::ProtoVersionInvalid(_)) => {
144                    let request = Rc::new(RefCell::new(Request::bad()));
145                    let response = Rc::new(RefCell::new(Response::bad()));
146                    response.borrow_mut().code = "505";
147                    stream_write(&mut stream, request, response);
148                    let _ = stream.shutdown(std::net::Shutdown::Both);
149                    return;
150                },
151                Err(_) => {
152                    let request = Rc::new(RefCell::new(Request::bad()));
153                    let response = Rc::new(RefCell::new(Response::bad()));
154                    stream_write(&mut stream, request, response);
155                    let _ = stream.shutdown(std::net::Shutdown::Both);
156                    return;
157                },
158                Ok(req) => req,
159            };
160
161            let request_rc = Rc::new(RefCell::new(request));
162            let mut session_id = Uuid::nil();
163            let response = Response::new(request_rc.clone(), session_manager.clone(), &mut session_id);
164            let response_rc = Rc::new(RefCell::new(response));
165            let mut ctx = Context::new(request_rc.clone(), response_rc.clone(), session_id, session_manager.clone());
166
167            middleware.run(&mut ctx);
168            router.call(&mut ctx);
169
170            stream_write(&mut stream, request_rc, response_rc);
171        },
172    };
173    let _ = stream.shutdown(std::net::Shutdown::Both);
174}
175
176/// Immortal middleware and routing configuration, as well as the session manager.
177#[derive(Default)]
178pub struct Immortal {
179    middleware: Middleware,
180    router: Router,
181    session_manager: Arc<SessionManager>,
182    #[allow(dead_code)]
183    session_prune_task: Option<(JoinHandle<()>, Arc<AtomicBool>)>,
184}
185
186#[allow(dead_code)]
187impl Immortal {
188    /// Construct a new Immortal server
189    pub fn new() -> Self {
190        Self {
191            middleware: Middleware::new(),
192            router: Router::new(),
193            session_manager: Arc::new(SessionManager::default()),
194            session_prune_task: None,
195        }
196    }
197
198    /// Listens for incoming connections, with as many threads as the system has available for
199    /// parallelism
200    pub fn listen<S>(
201        &self,
202        socket_addr: S
203    ) -> Result<(), ImmortalError> where S: Into<SocketAddr> {
204        self.listen_with(
205            socket_addr,
206            thread::available_parallelism()
207                .map_err(ImmortalError::Io)?
208                .get(),
209        )
210    }
211
212    /// Listens for incoming connections using a specific amount of threads
213    ///
214    /// If `threading` feature is not present, `thread_count` will be ignored
215    pub fn listen_with<S>(
216        &self,
217        socket_addr: S,
218        #[allow(unused_variables)]
219        thread_count: usize
220    ) -> Result<(), ImmortalError> where S: Into<SocketAddr> {
221        let socket_addr: SocketAddr = socket_addr.into();
222        let listener = TcpListener::bind(socket_addr)
223            .map_err(ImmortalError::Io)?;
224
225        println!("Server starting at: http://{socket_addr}");
226
227        #[cfg(feature = "threading")] 
228        {
229            let thread_pool = rayon::ThreadPoolBuilder::new()
230                .num_threads(thread_count)
231                .build()
232                .map_err(ImmortalError::Tpbe)?;
233
234            let _ = thread_pool.scope(|scope| -> Result<(), ImmortalError> {
235
236                loop {
237                    let (stream, _peer_addr) = listener.accept()
238                        .map_err(ImmortalError::AcceptError)?;
239
240                    scope.spawn(|_s| {
241                        handle_connection(
242                            stream,
243                            self.session_manager.clone(),
244                            &self.middleware,
245                            &self.router,
246                        );
247                    });
248                }
249            });
250        }
251
252        #[cfg(not(feature = "threading"))]
253        loop {
254            let (stream, _peer_addr) = listener.accept()
255                .map_err(ImmortalError::AcceptError)?;
256
257            handle_connection(
258                stream,
259                self.session_manager.clone(),
260                &self.middleware,
261                &self.router,
262            );
263        }
264
265        #[cfg(feature = "threading")] 
266        Ok(())
267    }
268
269    /// Pass a buffer through the HTTP implementation without listening on a port or dispatching
270    /// tasks to threads.
271    pub fn process_buffer(&mut self, request_buffer: &[u8]) -> Vec<u8> {
272        let request = match Request::from_slice(request_buffer) {
273            Err(_) => return Response::bad().serialize(),
274            Ok(req) => req,
275        };
276
277        let request_rc = Rc::new(RefCell::new(request));
278        let mut session_id = Uuid::nil();
279        let response = Response::new(request_rc.clone(), self.session_manager.clone(), &mut session_id);
280        let response_rc = Rc::new(RefCell::new(response));
281        let mut ctx = Context::new(request_rc.clone(), response_rc.clone(), session_id, self.session_manager.clone());
282
283        self.middleware.run(&mut ctx);
284        self.router.call(&mut ctx);
285
286        let data = response_rc.borrow_mut().serialize();
287        data
288    }
289
290    /// Adds middleware that gets executed just before the router.
291    ///
292    /// if a middleware handler produces a redirect, all of the following middleware handlers are
293    /// skipped and the redirect is yielded, if middleware produces a redirect, the router is
294    /// bypassed and custom routes do not run. 
295    pub fn add_middleware(&mut self, func: Handler) {
296        self.middleware.push(func);
297    }
298
299    /// Calls into the router to register a function
300    /// Returns true if a route was registered
301    pub fn register(&mut self, method: &str, route: &str, func: Handler) -> bool {
302        self.router.register(method, route, func)
303    }
304
305    /// Calls into the router to unregister a function
306    /// Returns true if a route was unregistered
307    pub fn unregister(&mut self, method: &str, route: &str) -> bool {
308        self.router.unregister(method, route)
309    }
310
311    /// Registers the fallback function for when a request is not caught by the router
312    /// or for if you want to handle all requests manually
313    pub fn fallback(&mut self, func: Handler) {
314        self.router.fallback = func;
315    }
316
317    /// sets the maximum duration that a session may be allowed to persist for
318    /// regardless of inactivity
319    pub fn set_session_duration(&self, duration: Duration) {
320        self.session_manager.set_session_duration(duration);
321    }
322
323    /// Sets the server-side session inactivity expiry duration
324    pub fn set_inactive_duration(&self, duration: Duration) {
325        self.session_manager.set_inactive_duration(duration);
326    }
327
328    /// Sets the prune rate for sessions, will attempt to prune old sessions every `duration`
329    pub fn set_prune_rate(&self, duration: Duration) {
330        self.session_manager.set_prune_rate(duration);
331    }
332
333    /// Configures sessions to be disabled, clears existing server-side sessions
334    pub fn disable_sessions(&mut self) {
335        self.session_manager.disable();
336
337        #[cfg(feature = "threading")]
338        if let Some((handle, stop)) = self.session_prune_task.take() {
339            stop.store(true, std::sync::atomic::Ordering::Relaxed);
340            handle.join().unwrap();
341        }
342    }
343
344    /// Configures sessions to be enabled
345    pub fn enable_sessions(&mut self) {
346        self.session_manager.enable();
347
348        #[cfg(feature = "threading")]
349        {
350            let stop = Arc::new(AtomicBool::new(false));
351            let stop_clone = stop.clone();
352            let session_manager_clone = self.session_manager.clone();
353
354            let thread_handle = thread::spawn(|| {
355                session::session_prune_task(
356                    session_manager_clone,
357                    stop_clone,
358                );
359            });
360
361            self.session_prune_task = Some((thread_handle, stop));
362        }
363    }
364}
365
366impl Drop for Immortal {
367    fn drop(&mut self) {
368        self.disable_sessions();
369    }
370}
371