mini_web_server/
lib.rs

1//! mini-web-server
2//!
3//! `mini-web-server` is a simple HTTP web server that uses a thread pool to respond asynchronously.  
4//! Supports 2 commandline arguments - threads(no of threads) and port(port number). Default threads is 4 and default port is 7878.
5//!
6//! For example to run the server with 10 worker threads listening at port 8787, run the following command -
7//! ./mini-web-server 10 8787
8
9use std::{
10    error::Error,
11    fs,
12    io::{prelude::*, BufReader},
13    net::{TcpListener, TcpStream},
14    sync::{mpsc, Arc, Mutex},
15    thread,
16    time::Duration,
17};
18
19const THREAD_SIZE: usize = 4;
20const PORT: usize = 7878;
21
22type Job = Box<dyn FnOnce() + Send + 'static>;
23
24pub struct Config {
25    pub thread_size: usize,
26    pub port: usize,
27}
28
29impl Config {
30    pub fn build(mut args: impl Iterator<Item = String>) -> Result<Config, &'static str> {
31        args.next();
32
33        let thread_size = match args.next() {
34            Some(arg) => {
35                if let Ok(arg) = arg.parse() {
36                    arg
37                } else {
38                    return Err("Please type a number for number of worker threads.");
39                }
40            }
41            None => {
42                println!(
43                    "No thread pool size specified. Using default {THREAD_SIZE} worker threads."
44                );
45                THREAD_SIZE
46            }
47        };
48
49        let port = match args.next() {
50            Some(arg) => {
51                if let Ok(arg) = arg.parse() {
52                    arg
53                } else {
54                    return Err("Please type a number for the TCP listening port higher than 1023. Default port is 7878.");
55                }
56            }
57            None => {
58                println!("No port number specified. Using default {PORT} worker threads.");
59                PORT
60            }
61        };
62
63        Ok(Config { thread_size, port })
64    }
65}
66
67struct Worker {
68    _id: usize,
69    thread: Option<thread::JoinHandle<()>>,
70}
71
72impl Worker {
73    pub fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
74        println!("Creating Worker {id}.");
75
76        // create a thread that continuously loops checking the channel for jobs
77        let thread = thread::spawn(move || loop {
78            let msg = receiver.lock().unwrap().recv();
79            match msg {
80                Ok(job) => {
81                    println!("Worker {id} receiver job; executing...");
82                    job();
83                }
84                Err(_e) => {
85                    println!("Worker {id} received disconnection request. Shutting down");
86                    break;
87                }
88            }
89
90            println!("Worker {id} completed execution.");
91        });
92
93        Worker {
94            _id: id,
95            thread: Some(thread),
96        }
97    }
98}
99pub struct ThreadPool {
100    worker_threads: Vec<Worker>,
101    sender: Option<mpsc::Sender<Job>>,
102}
103
104impl ThreadPool {
105    /// Create a new ThreadPool.
106    ///
107    /// The size is the number of threads in the pool.
108    ///
109    /// # Panics
110    ///
111    /// The `new` function will panic if the size is zero.
112    /// Todo: return Result<ThreadPool, PoolCreationError>
113    pub fn new(size: usize) -> ThreadPool {
114        assert!(size > 0);
115
116        // create a mpsc channel to send closure function implmenting work to be executed by spawned thread
117        let (sender, receiver) = mpsc::channel();
118
119        //wrap receiver in Arc<Mutex<T>> to share between threads since mpsc is multiple producer single consumer
120        let receiver = Arc::new(Mutex::new(receiver));
121
122        println!("Setting up {size} workers...");
123        let mut worker_threads = Vec::with_capacity(size);
124        for id in 0..size {
125            worker_threads.push(Worker::new(id + 1, Arc::clone(&receiver))); //create threads
126        }
127        ThreadPool {
128            worker_threads,
129            sender: Some(sender),
130        }
131    }
132
133    pub fn execute<T>(&self, f: T)
134    where
135        T: FnOnce() + Send + 'static,
136    {
137        let job = Box::new(f);
138        self.sender.as_ref().unwrap().send(job).unwrap();
139    }
140}
141
142impl Drop for ThreadPool {
143    fn drop(&mut self) {
144        //close the channel by dropping the sender end
145        //so that the receiver end receives error to exit the worker thread's loop
146        drop(self.sender.take());
147
148        //shut down worker threads by calling join.
149        for worker in &mut self.worker_threads {
150            if let Some(thread) = worker.thread.take() {
151                thread.join().unwrap();
152            }
153        }
154    }
155}
156
157pub fn run(config: Config) -> Result<(), Box<dyn Error>> {
158    //listen for tcp connections with TcpListner and bind to a port
159    let address = format!("127.0.0.1:{}", config.port);
160    let listener = TcpListener::bind(address)?;
161    let thread_pool = ThreadPool::new(config.thread_size);
162
163    //iterate through sequence of streams
164    for stream in listener.incoming() {
165        let stream = stream?;
166
167        thread_pool.execute(|| {
168            handle_connection(stream);
169        });
170    }
171
172    Ok(())
173}
174
175fn handle_connection(mut stream: TcpStream) {
176    let buf_reader = BufReader::new(&stream);
177    let mut http_request = buf_reader.lines();
178    let http_request_line = http_request.next().unwrap().unwrap();
179
180    //handle routes
181    let (status_line, file_name) = match &http_request_line[..] {
182        "GET / HTTP/1.1" => ("HTTP/1.1 200 OK\r\n", "welcome.html"),
183        "GET /sleep HTTP/1.1" => {
184            thread::sleep(Duration::from_secs(5));
185            ("HTTP/1.1 200 OK\r\n", "welcome.html")
186        }
187        _ => ("HTTP/1.1 400 NOT FOUND\r\n", "error.html"),
188    };
189
190    //print HTTP Request to console
191    let http_request: Vec<_> = http_request
192        .map(|result| result.unwrap())
193        .take_while(|line| !line.is_empty())
194        .collect();
195    println!("Connection Established. HTTP Req => {http_request_line}\n{http_request:#?}");
196
197    let contents = fs::read_to_string(file_name).unwrap();
198    let content_length = contents.len();
199    let response = format!("{status_line}Content-Length: {content_length}\r\n\r\n{contents}");
200
201    stream.write_all(response.as_bytes()).unwrap();
202}