ruver/
lib.rs

1use std::error::Error;
2use std::fs::File;
3use std::io::prelude::*;
4use std::net::TcpStream;
5use std::path::Path;
6use std::sync::mpsc;
7use std::sync::Arc;
8use std::sync::Mutex;
9use std::thread;
10use std::thread::sleep;
11use std::time::Duration;
12
13// # Thread Pool
14
15trait FnBox {
16    fn call_box(self: Box<Self>);
17}
18
19impl<F: FnOnce()> FnBox for F {
20    fn call_box(self: Box<F>) {
21        (*self)()
22    }
23}
24// Rust has a problem itself, that can not automatically use `self: Box<Self>` to take control of
25// the content of box.
26
27type Job = Box<FnBox + Send + 'static>;
28
29struct Worker {
30    id: usize,
31    thread: Option<thread::JoinHandle<()>>,
32}
33
34impl Worker {
35    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Message>>>) -> Worker {
36        let thread = thread::spawn(move || loop {
37            let message = receiver.lock().unwrap().recv().unwrap();
38
39            match message {
40                Message::NewJob(job) => job.call_box(),
41                Message::Terminate => break,
42            }
43        });
44
45        Worker {
46            id,
47            thread: Some(thread),
48        }
49    }
50}
51
52enum Message {
53    NewJob(Job),
54    Terminate,
55}
56
57/// A thread pool, containing a vector of workers and an actually "spmc" sender.
58pub struct ThreadPool {
59    workers: Vec<Worker>,
60    sender: mpsc::Sender<Message>,
61}
62
63impl Drop for ThreadPool {
64    fn drop(&mut self) {
65        println!("Sending terminate message to all workers.");
66
67        for _ in &mut self.workers {
68            self.sender.send(Message::Terminate).unwrap();
69        }
70
71        println!("Shutting down all workers.");
72
73        for worker in &mut self.workers {
74            println!("Shutting down worker {}", worker.id);
75
76            if let Some(thread) = worker.thread.take() {
77                thread.join().unwrap();
78            }
79        }
80        // Why divide loop?
81        // We don't exactly know if it is the worker in current loop that receive our signal.
82    }
83}
84
85impl ThreadPool {
86    /// Create a new thread pool.
87    ///
88    /// # Examples
89    ///
90    /// ```
91    /// use ruver::ThreadPool;
92    ///
93    /// let pool = ThreadPool::new(256);
94    ///
95    /// // These codes created a thread pool who has 256 threads.
96    /// ```
97    pub fn new(size: usize) -> ThreadPool {
98        assert!(size > 0);
99
100        let (sender, receiver) = mpsc::channel();
101
102        let receiver = Arc::new(Mutex::new(receiver));
103
104        let mut workers = Vec::with_capacity(size);
105
106        for id in 0..size {
107            workers.push(Worker::new(id, Arc::clone(&receiver)));
108        }
109
110        ThreadPool { workers, sender }
111    }
112
113    /// Throw a task into thread pool.
114    ///
115    /// # Examples
116    ///
117    /// ```
118    /// use ruver::ThreadPool;
119    ///
120    /// let pool = ThreadPool::new(256);
121    ///
122    /// let add_one = |x| {
123    ///     println!("{}", x + 1);
124    /// };
125    ///
126    /// pool.execute(add_one(1))
127    ///
128    /// // These codes print "2".
129    /// ```
130    pub fn execute<F>(&self, f: F)
131    where
132        F: FnOnce() + Send + 'static,
133    {
134        let job = Box::new(f);
135        // Why box?
136        // Box provides a type whose size is already known, while closure do not.
137
138        self.sender.send(Message::NewJob(job)).unwrap();
139    }
140}
141
142// # Handler
143
144struct Request {
145    method: String,
146    resource: String,
147    protocol: String,
148}
149
150enum StatusCode {
151    OK,
152    NotFound,
153    InternalServerError,
154}
155
156struct Responder {
157    stream: TcpStream,
158}
159
160impl Responder {
161    fn new(stream: TcpStream) -> Responder {
162        Responder { stream }
163    }
164
165    fn write(&mut self, response: &[u8]) {
166        match self.stream.write(response) {
167            Ok(_) => (),
168            Err(error) => {
169                eprintln!(
170                    "Caught an error when writing to TCP stream:\n{}",
171                    error.description()
172                );
173            }
174        };
175
176        self.stream.flush().unwrap();
177    }
178}
179
180fn parser(message: String) -> Request {
181    let (mut method, mut resource, mut protocol) = (String::new(), String::new(), String::new());
182
183    for (count, content) in message.split_whitespace().enumerate() {
184        match count {
185            0 => method.push_str(content),
186            1 => resource.push_str(content),
187            2 => protocol.push_str(content),
188            _ => break,
189        }
190    }
191
192    Request {
193        method,
194        resource,
195        protocol,
196    }
197}
198
199fn checker(request: &Request) -> StatusCode {
200    let raw_resource = format!(".{}", &request.resource);
201
202    let resource = Path::new(&raw_resource);
203
204    if let Err(error) = File::open(resource) {
205        eprintln!("Failed to open file:\n{}", error);
206        return StatusCode::InternalServerError;
207    };
208
209    if request.method.is_empty()
210        || raw_resource.is_empty()
211        || request.protocol.is_empty()
212        || request.method != "GET"
213        || resource.is_dir()
214        || !(request.protocol == "HTTP/1.0" || request.protocol == "HTTP/1.1")
215        || raw_resource.contains("/../")
216    {
217        println!("Internal server error.");
218        return StatusCode::InternalServerError;
219    }
220
221    if resource.exists() {
222        println!("Request {}.", raw_resource);
223        StatusCode::OK
224    } else {
225        println!("Can not find {}.", raw_resource);
226        StatusCode::NotFound
227    }
228}
229
230/// Receive and handle a TCP stream.
231///
232/// # Example
233///
234/// ```
235/// use std::net::TcpListener;
236///
237/// let listener = match TcpListener::bind("0.0.0.0:8000").unwrap();
238///
239/// for stream in listener.incoming() {
240///     let stream = stream.unwrap();
241///
242///     ruver::handle_connection(stream);
243/// }
244/// ```
245pub fn handle_connection(mut stream: TcpStream) {
246    let mut buffer = [0; 1024];
247
248    let mut raw_request = String::new();
249
250    stream
251        .set_read_timeout(Some(Duration::from_secs(5)))
252        .unwrap_or_else(|error| {
253            eprintln!("set_read_timeout call failed:\n{}", error.description())
254        });
255
256    stream
257        .set_write_timeout(Some(Duration::from_secs(5)))
258        .unwrap_or_else(|error| {
259            eprintln!("set_write_timeout call failed:\n{}", error.description())
260        });
261
262    loop {
263        let buffer_num = match stream.read(&mut buffer) {
264            Ok(t) => (t),
265            Err(error) => {
266                eprintln!(
267                    "Caught an error when reading from TCP stream:\n{}",
268                    error.description()
269                );
270                return;
271            }
272        };
273
274        if buffer_num > 0 {
275            raw_request.push_str(&String::from_utf8(buffer[..buffer_num].to_vec()).unwrap());
276        }
277
278        if raw_request.ends_with("\r\n\r\n") {
279            break;
280        }
281
282        sleep(Duration::from_millis(1));
283    }
284
285    let request = parser(raw_request);
286
287    let mut responder = Responder::new(stream);
288
289    match checker(&request) {
290        StatusCode::OK => {
291            let resource = format!(".{}", &request.resource);
292
293            let mut file = File::open(resource).unwrap();
294
295            let content_length = match file.metadata() {
296                Ok(t) => t.len(),
297                Err(error) => {
298                    eprintln!("Fail to read file:\n{}", error);
299                    return;
300                }
301            };
302
303            let head = format!(
304                "HTTP/1.0 200 OK\r\nContent-Length: {}\r\n\r\n",
305                content_length,
306            );
307
308            let head = head.as_bytes();
309
310            responder.write(head);
311
312            let mut buffer = [0; 1024];
313
314            loop {
315                let size = match file.read(&mut buffer) {
316                    Ok(t) => t,
317                    Err(error) => {
318                        eprintln!("Fail to read file:\n{}", error);
319                        return;
320                    }
321                };
322                if size > 0 {
323                    responder.write(&buffer);
324                } else {
325                    break;
326                }
327            }
328        }
329        StatusCode::NotFound => {
330            responder.write("HTTP/1.0 404 Not Found\r\n\r\n".as_bytes());
331        }
332        StatusCode::InternalServerError => {
333            responder.write("HTTP/1.0 500 Internal Server Error\r\n\r\n".as_bytes());
334        }
335    };
336}