1use std::error::Error;
2use std::fs::File;
3use std::io::prelude::*;
4use std::net::TcpStream;
5use std::path::Path;
6use std::sync::mpsc;
7use std::sync::Arc;
8use std::sync::Mutex;
9use std::thread;
10use std::thread::sleep;
11use std::time::Duration;
12
13trait FnBox {
16 fn call_box(self: Box<Self>);
17}
18
19impl<F: FnOnce()> FnBox for F {
20 fn call_box(self: Box<F>) {
21 (*self)()
22 }
23}
24type Job = Box<FnBox + Send + 'static>;
28
29struct Worker {
30 id: usize,
31 thread: Option<thread::JoinHandle<()>>,
32}
33
34impl Worker {
35 fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Message>>>) -> Worker {
36 let thread = thread::spawn(move || loop {
37 let message = receiver.lock().unwrap().recv().unwrap();
38
39 match message {
40 Message::NewJob(job) => job.call_box(),
41 Message::Terminate => break,
42 }
43 });
44
45 Worker {
46 id,
47 thread: Some(thread),
48 }
49 }
50}
51
52enum Message {
53 NewJob(Job),
54 Terminate,
55}
56
57pub struct ThreadPool {
59 workers: Vec<Worker>,
60 sender: mpsc::Sender<Message>,
61}
62
63impl Drop for ThreadPool {
64 fn drop(&mut self) {
65 println!("Sending terminate message to all workers.");
66
67 for _ in &mut self.workers {
68 self.sender.send(Message::Terminate).unwrap();
69 }
70
71 println!("Shutting down all workers.");
72
73 for worker in &mut self.workers {
74 println!("Shutting down worker {}", worker.id);
75
76 if let Some(thread) = worker.thread.take() {
77 thread.join().unwrap();
78 }
79 }
80 }
83}
84
85impl ThreadPool {
86 pub fn new(size: usize) -> ThreadPool {
98 assert!(size > 0);
99
100 let (sender, receiver) = mpsc::channel();
101
102 let receiver = Arc::new(Mutex::new(receiver));
103
104 let mut workers = Vec::with_capacity(size);
105
106 for id in 0..size {
107 workers.push(Worker::new(id, Arc::clone(&receiver)));
108 }
109
110 ThreadPool { workers, sender }
111 }
112
113 pub fn execute<F>(&self, f: F)
131 where
132 F: FnOnce() + Send + 'static,
133 {
134 let job = Box::new(f);
135 self.sender.send(Message::NewJob(job)).unwrap();
139 }
140}
141
142struct Request {
145 method: String,
146 resource: String,
147 protocol: String,
148}
149
150enum StatusCode {
151 OK,
152 NotFound,
153 InternalServerError,
154}
155
156struct Responder {
157 stream: TcpStream,
158}
159
160impl Responder {
161 fn new(stream: TcpStream) -> Responder {
162 Responder { stream }
163 }
164
165 fn write(&mut self, response: &[u8]) {
166 match self.stream.write(response) {
167 Ok(_) => (),
168 Err(error) => {
169 eprintln!(
170 "Caught an error when writing to TCP stream:\n{}",
171 error.description()
172 );
173 }
174 };
175
176 self.stream.flush().unwrap();
177 }
178}
179
180fn parser(message: String) -> Request {
181 let (mut method, mut resource, mut protocol) = (String::new(), String::new(), String::new());
182
183 for (count, content) in message.split_whitespace().enumerate() {
184 match count {
185 0 => method.push_str(content),
186 1 => resource.push_str(content),
187 2 => protocol.push_str(content),
188 _ => break,
189 }
190 }
191
192 Request {
193 method,
194 resource,
195 protocol,
196 }
197}
198
199fn checker(request: &Request) -> StatusCode {
200 let raw_resource = format!(".{}", &request.resource);
201
202 let resource = Path::new(&raw_resource);
203
204 if let Err(error) = File::open(resource) {
205 eprintln!("Failed to open file:\n{}", error);
206 return StatusCode::InternalServerError;
207 };
208
209 if request.method.is_empty()
210 || raw_resource.is_empty()
211 || request.protocol.is_empty()
212 || request.method != "GET"
213 || resource.is_dir()
214 || !(request.protocol == "HTTP/1.0" || request.protocol == "HTTP/1.1")
215 || raw_resource.contains("/../")
216 {
217 println!("Internal server error.");
218 return StatusCode::InternalServerError;
219 }
220
221 if resource.exists() {
222 println!("Request {}.", raw_resource);
223 StatusCode::OK
224 } else {
225 println!("Can not find {}.", raw_resource);
226 StatusCode::NotFound
227 }
228}
229
230pub fn handle_connection(mut stream: TcpStream) {
246 let mut buffer = [0; 1024];
247
248 let mut raw_request = String::new();
249
250 stream
251 .set_read_timeout(Some(Duration::from_secs(5)))
252 .unwrap_or_else(|error| {
253 eprintln!("set_read_timeout call failed:\n{}", error.description())
254 });
255
256 stream
257 .set_write_timeout(Some(Duration::from_secs(5)))
258 .unwrap_or_else(|error| {
259 eprintln!("set_write_timeout call failed:\n{}", error.description())
260 });
261
262 loop {
263 let buffer_num = match stream.read(&mut buffer) {
264 Ok(t) => (t),
265 Err(error) => {
266 eprintln!(
267 "Caught an error when reading from TCP stream:\n{}",
268 error.description()
269 );
270 return;
271 }
272 };
273
274 if buffer_num > 0 {
275 raw_request.push_str(&String::from_utf8(buffer[..buffer_num].to_vec()).unwrap());
276 }
277
278 if raw_request.ends_with("\r\n\r\n") {
279 break;
280 }
281
282 sleep(Duration::from_millis(1));
283 }
284
285 let request = parser(raw_request);
286
287 let mut responder = Responder::new(stream);
288
289 match checker(&request) {
290 StatusCode::OK => {
291 let resource = format!(".{}", &request.resource);
292
293 let mut file = File::open(resource).unwrap();
294
295 let content_length = match file.metadata() {
296 Ok(t) => t.len(),
297 Err(error) => {
298 eprintln!("Fail to read file:\n{}", error);
299 return;
300 }
301 };
302
303 let head = format!(
304 "HTTP/1.0 200 OK\r\nContent-Length: {}\r\n\r\n",
305 content_length,
306 );
307
308 let head = head.as_bytes();
309
310 responder.write(head);
311
312 let mut buffer = [0; 1024];
313
314 loop {
315 let size = match file.read(&mut buffer) {
316 Ok(t) => t,
317 Err(error) => {
318 eprintln!("Fail to read file:\n{}", error);
319 return;
320 }
321 };
322 if size > 0 {
323 responder.write(&buffer);
324 } else {
325 break;
326 }
327 }
328 }
329 StatusCode::NotFound => {
330 responder.write("HTTP/1.0 404 Not Found\r\n\r\n".as_bytes());
331 }
332 StatusCode::InternalServerError => {
333 responder.write("HTTP/1.0 500 Internal Server Error\r\n\r\n".as_bytes());
334 }
335 };
336}