speed_rs_core/
lib.rs

1//! SpeedRs provide you a fast, efficient way to construct HTTP Server
2
3/// More utilities
4pub mod utils;
5
6use std::{
7    collections::HashMap,
8    io::{BufRead, BufReader, Read, Write},
9    net::{TcpListener, TcpStream},
10    panic,
11    sync::{
12        mpsc::{self, Receiver, Sender},
13        Arc, Mutex, RwLock,
14    },
15    thread::{spawn, JoinHandle}, error::Error, vec,
16};
17
18// Enums
19
20/// HTTP server run mode
21/// - `SingleThread` - run in single thread
22/// - `MultiThread` - run with a thread pool (`HttpServerThreadPool`)
23///
24/// Example:
25/// ```rust
26/// let mut server = HttpServer::new(HttpServerMode::SingleThread, "127.0.0.1:3000");
27/// let mut server = HttpServer::new(HttpServerMode::MultiThread(HttpServerThreadPool::new(2)), "127.0.0.1:3000");
28/// ```
29pub enum HttpServerMode {
30    SingleThread,
31    MultiThread(HttpServerThreadPool),
32}
33
34// Types
35
36type ExecutorJob = Box<dyn FnOnce() + Send + 'static>;
37
38/// Handle function for HTTP request.
39///
40/// Example:
41/// ```rust
42/// server.insert_handler(|mut req, mut res| {
43///     res.set_status(HttpStatusStruct(200, "OK"));
44///     res.set_body(String::from("value"), String::from("Hello World!"));
45///     Ok((req, res))
46/// });
47/// ```
48pub type RequestHandleFunc = Box<dyn Fn(HttpRequest, HttpResponse) -> Result<(HttpRequest, HttpResponse), (HttpRequest, HttpResponse, Box<dyn Error>)> + Send + Sync + 'static>;
49
50/// Handle function for HTTP request when Error
51/// 
52/// Example:
53/// ```rust
54/// server.set_error_handler(|req, mut res, err| {
55///     res.set_status(HttpStatusStruct(500, "Interal Server Error"));
56///     res.text(format!("Unhandled exception: {:?}", err));
57///     (req, res)
58/// });
59/// ```
60pub type RequestErrorHandleFunc = Box<dyn Fn(HttpRequest, HttpResponse, Box<dyn Error>) -> (HttpRequest, HttpResponse) + Send + Sync + 'static>;
61
62// Traits
63
64// Declarations
65/// HTTP status structure.
66///
67/// Example:
68/// ```rust
69/// HttpStatusStruct(200, "OK")
70/// HttpStatusStruct(400, "Not Found")
71/// HttpStatusStruct(500, "This is not a bug. It is a feature.")
72/// ```
73pub struct HttpStatusStruct(pub i32, pub &'static str);
74
75/// Thread pool implementation for multi-thread HTTP server process.
76/// ```rust
77/// HttpServerThreadPool::new(4)    // 4 threads for handling requests
78/// ```
79pub struct HttpServerThreadPool {
80    size: usize,
81    executors: Vec<HttpServerThreadExecutor>,
82    sender: Option<Sender<ExecutorJob>>,
83}
84
85struct HttpServerThreadExecutor {
86    id: usize,
87    thread: Option<JoinHandle<()>>,
88}
89
90/// The almighty HTTP server.
91///
92/// Guide:
93/// 1. Create the server
94/// ```rust
95/// let mut server = HttpServer::new(HttpServerMode::MultiThread(HttpServerThreadPool::new(2)), "127.0.0.1:3000");
96/// ```
97/// 2. Insert handlers
98/// ```rust
99/// server.insert_handler(|mut req, mut res| {
100///     res.set_status(HttpStatusStruct(200, "OK"));
101///     res.set_body(String::from("value"), String::from("Hello World!"));
102///     Ok(req, res)
103/// });
104/// ```
105/// 3. Listen
106/// ```rust
107/// server.listen(|| {
108///     println!("Server is listening at http://127.0.0.1:3000");
109/// });
110/// ```
111pub struct HttpServer {
112    mode: HttpServerMode,
113    listener: TcpListener,
114    handlers: Arc<RwLock<Vec<RequestHandleFunc>>>,
115    error_handler: Arc<RwLock<RequestErrorHandleFunc>>
116}
117
118pub struct HttpRequest {
119    headers: HashMap<String, String>,
120    body: Vec<u8>,
121    method: String,
122    uri: String,
123    version: String,
124}
125
126pub struct HttpResponse {
127    headers: HashMap<String, String>,
128    body: Vec<u8>,
129    status: HttpStatusStruct,
130}
131
132// Implementations
133
134impl HttpServerThreadPool {
135    pub fn new(size: usize) -> Self {
136        assert!(size > 0, "Size of thread pool must be greater than 0");
137
138        let (sender, receiver) = mpsc::channel::<ExecutorJob>();
139
140        let receiver = Arc::new(Mutex::new(receiver));
141
142        let mut executors: Vec<HttpServerThreadExecutor> = Vec::with_capacity(size);
143
144        for i in 0..size {
145            executors.push(HttpServerThreadExecutor::new(i + 1, Arc::clone(&receiver)));
146        }
147
148        Self {
149            size,
150            executors,
151            sender: Some(sender),
152        }
153    }
154
155    fn execute<F>(&self, f: F)
156    where
157        F: FnOnce() + Send + 'static,
158    {
159        let job = Box::new(f);
160        self.sender.as_ref().unwrap().send(job).unwrap();
161    }
162}
163
164// Clean up the thread pool
165impl Drop for HttpServerThreadPool {
166    fn drop(&mut self) {
167        drop(self.sender.take());
168
169        for executor in &mut self.executors {
170            // println!("Shutting the executor {} down...", executor.id);
171
172            if let Some(thread) = executor.thread.take() {
173                thread.join().unwrap();
174            }
175
176            // println!("Executor {} shutted down.", executor.id);
177        }
178    }
179}
180
181impl HttpServerThreadExecutor {
182    pub fn new(id: usize, receiver: Arc<Mutex<Receiver<ExecutorJob>>>) -> Self {
183        let thread = spawn(move || loop {
184            let job = receiver.lock().unwrap().recv();
185
186            match job {
187                Ok(job) => {
188                    // println!("Executor {} received a job. Begin executing...", id);
189
190                    job();
191
192                    // println!("Executor {} finished its job.", id);
193                }
194                Err(_err) => {
195                    // println!("{:?}", err);
196                    // println!("Shutting executor down!");
197                    break;
198                }
199            }
200        });
201
202        Self {
203            id,
204            thread: Some(thread),
205        }
206    }
207}
208
209impl HttpServer {
210    /**
211     * This function extract string data from the TCP stream request
212     */
213    fn handle_tcp_stream(stream: TcpStream, request_handles: Arc<RwLock<Vec<RequestHandleFunc>>>, request_error_handle: Arc<RwLock<RequestErrorHandleFunc>>) {
214        // init reader
215        let mut reader = BufReader::new(&stream);
216
217        // read the request headlines
218        let request_headlines: Vec<String> = reader
219            .by_ref()
220            .lines()
221            .map(|line| line.unwrap())
222            .take_while(|line| !line.is_empty())
223            .collect();
224
225        // find content length and content type
226        let content_length = request_headlines
227            .iter()
228            .find_map(|line| {
229                let parts: Vec<_> = line.splitn(2, ':').collect();
230                if parts[0].to_lowercase() == "content-length" {
231                    parts.get(1)?.trim().parse::<usize>().ok()
232                } else {
233                    None
234                }
235            })
236            .unwrap_or(0);
237
238        // read the request body
239        let mut body = Vec::<u8>::new();
240        if content_length > 0 {
241            body = vec![0; content_length];
242            reader.by_ref().read_exact(&mut body).unwrap();
243        }
244        let mut req = HttpRequest::new(request_headlines, body);
245        let mut res = HttpResponse::new();
246
247        for handle in request_handles.read().unwrap().iter() {
248            (req, res) = match handle(req, res) {
249                Ok((req, res)) => (req, res),
250                Err((req, res, e)) => request_error_handle.read().unwrap()(req, res, e)
251            }
252        }
253
254        HttpServer::write_response(stream, req, res);
255    }
256
257    /**
258     * Server write the response to client
259     */
260    fn write_response(mut stream: TcpStream, req: HttpRequest, mut res: HttpResponse) {
261        // construct response body
262        if !res.headers().contains_key("Content-Type") {
263            res.insert_header(String::from("Content-Type"), String::from("application/octet-stream"));
264        }
265        res.insert_header(
266            String::from("Content-Length"),
267            String::from(res.body().len().to_string()),
268        );
269
270        // construct response headlines
271        let mut response_headlines = Vec::<String>::new();
272        response_headlines.push(String::from(format!(
273            "{} {} {}",
274            req.version(),
275            res.status().0,
276            res.status().1
277        )));
278
279        for header in res.headers() {
280            response_headlines.push(String::from(format!("{}: {}", header.0, header.1)));
281        }
282
283        // construct response string
284        let mut response_string = String::new();
285
286        for line in response_headlines {
287            response_string.push_str(&line);
288            response_string.push('\n');
289        }
290        response_string.push('\n');
291        let mut response_data = Vec::from(response_string.as_bytes());
292        response_data.append(&mut res.body);
293
294        // println!("Response string: {}", &response_string);
295
296        stream.write_all(&response_data).unwrap();
297    }
298
299    pub fn new(mode: HttpServerMode, bind_adr: &str) -> Self {
300        let listener = TcpListener::bind(bind_adr).unwrap();
301        let default_error_handler = |req: HttpRequest, mut res: HttpResponse, err: Box<dyn Error>| {
302            res.set_status(HttpStatusStruct(500, "Interal Server Error"));
303            res.insert_header(String::from("Content-Type"), String::from("text/plain"));
304            res.text(format!("Unhandled exception: {:?}", err));
305            (req, res)
306        };
307        Self {
308            mode,
309            listener,
310            handlers: Arc::new(RwLock::new(Vec::<RequestHandleFunc>::new())),
311            error_handler: Arc::new(RwLock::new(Box::new(default_error_handler)))
312        }
313    }
314
315    pub fn listen<F>(&self, cb: F) where F: Fn() {
316        cb();
317        for stream in self.listener.incoming() {
318            let stream = stream.unwrap();
319            let handles_arc = Arc::clone(&self.handlers);
320            let error_handle_arc = Arc::clone(&self.error_handler);
321            match &self.mode {
322                HttpServerMode::SingleThread => {
323                    if let Err(e) = panic::catch_unwind(move || HttpServer::handle_tcp_stream(stream, handles_arc, error_handle_arc)) {
324                        println!("Panic occurred in handle_tcp_stream()!");
325                        println!("Error: {:?}", e);
326                    }
327                }
328                HttpServerMode::MultiThread(pool) => {
329                    pool.execute(move || {
330                        if let Err(e) = panic::catch_unwind(move || HttpServer::handle_tcp_stream(stream, handles_arc, error_handle_arc)) {
331                            println!("Panic occurred in handle_tcp_stream()!");
332                            println!("Error: {:?}", e);
333                        }
334                    });
335                }
336            }
337        }
338    }
339
340    pub fn insert_handler<F>(&mut self, handler: F)
341                where F: Fn(HttpRequest, HttpResponse) -> Result<(HttpRequest, HttpResponse), (HttpRequest, HttpResponse, Box<dyn Error>)> + Send + Sync + 'static {
342        let mut writter = self.handlers.write().unwrap();
343        writter.push(Box::new(handler));
344    }
345
346    /// Custom error handling function
347    /// 
348    /// Example:
349    /// ```rust
350    /// server.set_error_handler(|req, mut res, err| {
351    ///     res.set_status(HttpStatusStruct(500, "Interal Server Error"));
352    ///     res.text(format!("Unhandled exception: {:?}", err));
353    ///     (req, res)
354    /// });
355    /// ```
356    pub fn set_error_handler<F>(&mut self, handler: F)
357                where F: Fn(HttpRequest, HttpResponse, Box<dyn Error>) -> (HttpRequest, HttpResponse) + Send + Sync + 'static {
358        let mut writter = self.error_handler.write().unwrap();
359        *writter = Box::new(handler);
360    }
361}
362
363impl HttpRequest {
364    fn new(mut request_headlines: Vec<String>, body: Vec<u8>) -> Self {
365        // get the first line out
366        let first_line = request_headlines.remove(0);
367        let metadata: Vec<&str> = first_line.split(" ").collect();
368        let method = String::from(metadata[0]);
369        let uri = String::from(metadata[1]);
370        let version = String::from(metadata[2]);
371
372        // transform header strings to headers map
373        let mut headers = HashMap::<String, String>::new();
374        for line in request_headlines {
375            let elements: Vec<&str> = line.split(": ").collect();
376            if elements.len() >= 2 {
377                headers.insert(String::from(elements[0]), String::from(elements[1]));
378            }
379        }
380
381        Self {
382            headers,
383            body,
384            method,
385            uri,
386            version,
387        }
388    }
389
390    /// Retrieve the request headers
391    pub fn headers(&self) -> &HashMap<String, String> {
392        &self.headers
393    }
394
395    /// Retrieve the request body
396    pub fn body(&self) -> &Vec<u8> {
397        &self.body
398    }
399
400    /// Retrieve the request method
401    pub fn method(&self) -> &String {
402        &self.method
403    }
404
405    /// Retrieve the request URI
406    pub fn uri(&self) -> &String {
407        &self.uri
408    }
409
410    /// Retrieve the HTTP version
411    pub fn version(&self) -> &String {
412        &self.version
413    }
414}
415
416impl HttpResponse {
417    fn new() -> Self {
418        let headers = HashMap::<String, String>::new();
419        let status = HttpStatusStruct(404, "Not Found");
420
421        Self {
422            headers,
423            body: Vec::new(),
424            status,
425        }
426    }
427
428    /// Insert a pair key - value to response headers (if key is already existed, replace the old value of key)
429    pub fn insert_header(&mut self, key: String, value: String) {
430        self.headers.insert(key, value);
431    }
432
433    /// Retrieve the response headers
434    pub fn headers(&self) -> &HashMap<String, String> {
435        &self.headers
436    }
437
438    /// Retrieve the response body
439    pub fn body(&self) -> &[u8] {
440        &self.body
441    }
442
443    /// Retrieve the response body as string
444    pub fn body_string(&self) -> Result<String, std::string::FromUtf8Error> {
445        String::from_utf8(self.body.clone())
446    }
447
448    /// Set the response body text
449    pub fn text(&mut self, t: String) {
450        self.body = Vec::from(t.as_bytes());
451    }
452
453    pub fn bytes(&mut self, b: Vec<u8>) {
454        self.body = b;
455    }
456
457    /// Retrieve the response status
458    pub fn status(&self) -> &HttpStatusStruct {
459        &self.status
460    }
461
462    /// Set the response status
463    pub fn set_status(&mut self, status: HttpStatusStruct) {
464        self.status = status;
465    }
466}