1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
use std::collections::HashMap;
use std::io::Write;
use std::net::{TcpListener, TcpStream};
use std::sync::mpsc;
use std::sync::{Arc, Condvar, Mutex};
use std::thread;
use std::time::Duration;
mod http_parser;
pub mod request;
pub mod response;

pub struct Spot {
    threads: Vec<std::thread::JoinHandle<()>>,
    stream_locks: Vec<Arc<(Mutex<bool>, Condvar)>>,
    amount_of_threads: u32,
    routes: HashMap<String, fn(request::Request, response::Response) -> response::Response>,
}

impl Spot {
    pub fn new(amount_of_threads: u32) -> Spot {
        return Spot {
            threads: Vec::new(),
            stream_locks: Vec::new(),
            amount_of_threads: amount_of_threads,
            routes: HashMap::new(),
        };
    }

    pub fn route(
        &mut self,
        path: &str,
        function: fn(request::Request, response::Response) -> response::Response,
    ) {
        if self.routes.contains_key(path) {
            println!("Warning: Route defined twice, using last definition");
            self.routes.remove(path);
        }
        self.routes.insert(path.to_owned(), function);
    }

    pub fn bind(&mut self, ip: &str) -> Result<bool, String> {
        let listener = match TcpListener::bind(ip) {
            Ok(result) => result,
            Err(error) => {
                return Err(String::from(format!("Failed to bind to ip: {}", error)));
            }
        };

        let mut senders = Vec::new();

        for i in 0..self.amount_of_threads {
            // Communication channel
            let (sender, receiver) = mpsc::channel();
            senders.push(sender);

            let stream_lock = Arc::new((Mutex::new(true), Condvar::new()));
            let stream_lock_clone = stream_lock.clone();
            self.stream_locks.push(stream_lock);
            let routes_clone = self.routes.clone();
            let new_worker = thread::Builder::new()
                .name(format!("Spot-Worker-{}", i + 1))
                .spawn(move || loop {
                    let (lock, condvar) = &*stream_lock_clone;
                    let mut waiting = lock.lock().unwrap();
                    while *waiting {
                        waiting = condvar.wait(waiting).unwrap();
                    }
                    let mut stream: TcpStream = match receiver.recv() {
                        Ok(stream) => stream,
                        Err(error) => {
                            println!("Error: {}", error);
                            continue;
                        }
                    };
                    let result = http_parser::HttpParser::parse(&stream);
                    let request = match result {
                        Ok(request) => request,
                        Err(error) => {
                            println!("Error: {}", error);
                            continue;
                        }
                    };
                    let mut response = response::Response::new(String::new(), HashMap::new());
                    response.header("content-type", "text/html; charset=UTF-8");
                    if routes_clone.contains_key(&request.url) {
                        response = routes_clone[&request.url](request, response);
                    }
                    let five_seconds = Duration::new(5, 0);
                    stream
                        .set_write_timeout(Some(five_seconds))
                        .expect("set_write_timeout call failed");
                    match stream.write(response.to_http().as_bytes()) {
                        Ok(_) => println!("Response sent"),
                        Err(e) => println!("Failed sending response: {}", e),
                    }
                    *waiting = true;
                });
            match new_worker {
                Ok(thread) => self.threads.push(thread),
                Err(error) => {
                    println!("Spot: Failed to start thread {}, ERROR: {}", i + 1, error);
                }
            }
        }

        // Stream distributor
        for stream in listener.incoming() {
            let stream = match stream {
                Ok(stream) => stream,
                Err(error) => {
                    println!("Tcp error: {}", error);
                    return Err(String::from(format!("Tcp error: {}", error)));
                }
            };
            let mut done = false;
            let mut index = 0;
            while !done {
                index = 0;
                for stream_lock in self.stream_locks.clone() {
                    let arc = &*stream_lock;
                    let mut waiting = arc.0.try_lock();
                    let mut free = false;
                    if let Ok(ref mut mutex) = waiting {
                        **mutex = false;
                        free = true;
                    } else {
                        println!("waiting try_lock failed");
                    }

                    if free {
                        done = true;
                        arc.1.notify_one();
                        break;
                    } else {
                        index += 1;
                    }
                }
            }
            let _result = senders[index].send(stream);
        }
        // Should be unreachable
        Ok(true)
    }
}