1use std::{
10 error::Error,
11 fs,
12 io::{prelude::*, BufReader},
13 net::{TcpListener, TcpStream},
14 sync::{mpsc, Arc, Mutex},
15 thread,
16 time::Duration,
17};
18
19const THREAD_SIZE: usize = 4;
20const PORT: usize = 7878;
21
22type Job = Box<dyn FnOnce() + Send + 'static>;
23
24pub struct Config {
25 pub thread_size: usize,
26 pub port: usize,
27}
28
29impl Config {
30 pub fn build(mut args: impl Iterator<Item = String>) -> Result<Config, &'static str> {
31 args.next();
32
33 let thread_size = match args.next() {
34 Some(arg) => {
35 if let Ok(arg) = arg.parse() {
36 arg
37 } else {
38 return Err("Please type a number for number of worker threads.");
39 }
40 }
41 None => {
42 println!(
43 "No thread pool size specified. Using default {THREAD_SIZE} worker threads."
44 );
45 THREAD_SIZE
46 }
47 };
48
49 let port = match args.next() {
50 Some(arg) => {
51 if let Ok(arg) = arg.parse() {
52 arg
53 } else {
54 return Err("Please type a number for the TCP listening port higher than 1023. Default port is 7878.");
55 }
56 }
57 None => {
58 println!("No port number specified. Using default {PORT} worker threads.");
59 PORT
60 }
61 };
62
63 Ok(Config { thread_size, port })
64 }
65}
66
67struct Worker {
68 _id: usize,
69 thread: Option<thread::JoinHandle<()>>,
70}
71
72impl Worker {
73 pub fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
74 println!("Creating Worker {id}.");
75
76 let thread = thread::spawn(move || loop {
78 let msg = receiver.lock().unwrap().recv();
79 match msg {
80 Ok(job) => {
81 println!("Worker {id} receiver job; executing...");
82 job();
83 }
84 Err(_e) => {
85 println!("Worker {id} received disconnection request. Shutting down");
86 break;
87 }
88 }
89
90 println!("Worker {id} completed execution.");
91 });
92
93 Worker {
94 _id: id,
95 thread: Some(thread),
96 }
97 }
98}
99pub struct ThreadPool {
100 worker_threads: Vec<Worker>,
101 sender: Option<mpsc::Sender<Job>>,
102}
103
104impl ThreadPool {
105 pub fn new(size: usize) -> ThreadPool {
114 assert!(size > 0);
115
116 let (sender, receiver) = mpsc::channel();
118
119 let receiver = Arc::new(Mutex::new(receiver));
121
122 println!("Setting up {size} workers...");
123 let mut worker_threads = Vec::with_capacity(size);
124 for id in 0..size {
125 worker_threads.push(Worker::new(id + 1, Arc::clone(&receiver))); }
127 ThreadPool {
128 worker_threads,
129 sender: Some(sender),
130 }
131 }
132
133 pub fn execute<T>(&self, f: T)
134 where
135 T: FnOnce() + Send + 'static,
136 {
137 let job = Box::new(f);
138 self.sender.as_ref().unwrap().send(job).unwrap();
139 }
140}
141
142impl Drop for ThreadPool {
143 fn drop(&mut self) {
144 drop(self.sender.take());
147
148 for worker in &mut self.worker_threads {
150 if let Some(thread) = worker.thread.take() {
151 thread.join().unwrap();
152 }
153 }
154 }
155}
156
157pub fn run(config: Config) -> Result<(), Box<dyn Error>> {
158 let address = format!("127.0.0.1:{}", config.port);
160 let listener = TcpListener::bind(address)?;
161 let thread_pool = ThreadPool::new(config.thread_size);
162
163 for stream in listener.incoming() {
165 let stream = stream?;
166
167 thread_pool.execute(|| {
168 handle_connection(stream);
169 });
170 }
171
172 Ok(())
173}
174
175fn handle_connection(mut stream: TcpStream) {
176 let buf_reader = BufReader::new(&stream);
177 let mut http_request = buf_reader.lines();
178 let http_request_line = http_request.next().unwrap().unwrap();
179
180 let (status_line, file_name) = match &http_request_line[..] {
182 "GET / HTTP/1.1" => ("HTTP/1.1 200 OK\r\n", "welcome.html"),
183 "GET /sleep HTTP/1.1" => {
184 thread::sleep(Duration::from_secs(5));
185 ("HTTP/1.1 200 OK\r\n", "welcome.html")
186 }
187 _ => ("HTTP/1.1 400 NOT FOUND\r\n", "error.html"),
188 };
189
190 let http_request: Vec<_> = http_request
192 .map(|result| result.unwrap())
193 .take_while(|line| !line.is_empty())
194 .collect();
195 println!("Connection Established. HTTP Req => {http_request_line}\n{http_request:#?}");
196
197 let contents = fs::read_to_string(file_name).unwrap();
198 let content_length = contents.len();
199 let response = format!("{status_line}Content-Length: {content_length}\r\n\r\n{contents}");
200
201 stream.write_all(response.as_bytes()).unwrap();
202}