makepad_http/
server.rs

1// this webserver is serving our site. Why? WHYYY. Because it was fun to write. And MUCH faster and MUCH simpler than anything else imaginable.
2
3use std::net::{TcpListener, TcpStream, SocketAddr, Shutdown};
4use std::io::prelude::*;
5use std::sync::{mpsc, mpsc::{RecvTimeoutError}};
6use std::time::Duration;
7
8use crate::websocket::{WebSocket, WebSocketMessage, BinaryMessageHeader, PING_MESSAGE};
9use crate::utils::*;
10
11#[derive(Clone)]
12pub struct HttpServer {
13    pub listen_address: SocketAddr,
14    pub request: mpsc::Sender<HttpServerRequest>,
15    pub post_max_size: u64
16}
17
18pub struct HttpServerResponse {
19    pub header: String,
20    pub body: Vec<u8>
21}
22
23pub enum HttpServerRequest {
24    ConnectWebSocket {
25        web_socket_id: u64,
26        headers:HttpServerHeaders,
27        response_sender: mpsc::Sender<Vec<u8 >>,
28    },
29    DisconnectWebSocket {
30        web_socket_id: u64,
31    },
32    BinaryMessage {
33        web_socket_id: u64,
34        response_sender: mpsc::Sender<Vec<u8 >>,
35        data: Vec<u8>
36    },
37    Get {
38        headers: HttpServerHeaders,
39        response_sender: mpsc::Sender<HttpServerResponse>,
40    },
41    Post {
42        headers: HttpServerHeaders,
43        body: Vec<u8>,
44        response: mpsc::Sender<HttpServerResponse>,
45    }
46}
47
48pub fn start_http_server(
49    http_server: HttpServer,
50) -> Option<std::thread::JoinHandle<() >> {
51    
52    let listener = if let Ok(listener) = TcpListener::bind(http_server.listen_address) {listener} else {println!("Cannot bind http server port"); return None};
53    
54    let listen_thread = {
55        std::thread::spawn(move || {
56            let mut connection_counter = 0u64;
57            for tcp_stream in listener.incoming() {
58                let mut tcp_stream = if let Ok(tcp_stream) = tcp_stream {
59                    tcp_stream
60                }
61                else {
62                    println!("Incoming stream failure");
63                    continue
64                };
65                let http_server = http_server.clone();
66                connection_counter += 1;
67                let _read_thread = std::thread::spawn(move || {
68                    
69                    let headers = HttpServerHeaders::from_tcp_stream(&mut tcp_stream);
70                    if headers.is_none() {
71                        return http_error_out(tcp_stream, 500);
72                    }
73                    let headers = headers.unwrap();
74                    
75                    if headers.sec_websocket_key.is_some() {
76                        return handle_web_socket(http_server, tcp_stream, headers, connection_counter);
77                    }
78                    if headers.verb == "POST" {
79                        return handle_post(http_server, tcp_stream, headers);
80                    }
81                    if headers.verb == "GET" {
82                        return handle_get(http_server, tcp_stream, headers);
83                    }
84                    http_error_out(tcp_stream, 500)
85                });
86            }
87        })
88    };
89    Some(listen_thread)
90}
91
92fn handle_post(http_server: HttpServer, mut tcp_stream: TcpStream, headers: HttpServerHeaders) {
93    // we have to have a content-length or bust
94    if headers.content_length.is_none() {
95        return http_error_out(tcp_stream, 500);
96    }
97    let content_length = headers.content_length.unwrap();
98    if content_length > http_server.post_max_size {
99        return http_error_out(tcp_stream, 500);
100    }
101    let bytes_total = content_length as usize;
102    let mut body = Vec::new();
103    body.resize(bytes_total, 0u8);
104    
105    let mut bytes_left = bytes_total;
106    while bytes_left > 0 {
107        let buf = &mut body[(bytes_total - bytes_left)..bytes_total];
108        let bytes_read = tcp_stream.read(buf);
109        if bytes_read.is_err() {
110            return http_error_out(tcp_stream, 500);
111        }
112        let bytes_read = bytes_read.unwrap();
113        if bytes_read == 0 {
114            return http_error_out(tcp_stream, 500);
115        }
116        bytes_left -= bytes_read;
117    }
118    
119    let (tx_socket, rx_socket) = mpsc::channel::<HttpServerResponse> ();
120    if http_server.request.send(HttpServerRequest::Post {
121        headers,
122        body,
123        response: tx_socket
124    }).is_err() {
125        return http_error_out(tcp_stream, 500);
126    };
127    
128    if let Ok(response) = rx_socket.recv() {
129        write_bytes_to_tcp_stream_no_error(&mut tcp_stream, response.header.as_bytes());
130        write_bytes_to_tcp_stream_no_error(&mut tcp_stream, &response.body);
131    }
132    let _ = tcp_stream.shutdown(Shutdown::Both);
133}
134
135fn handle_web_socket(http_server: HttpServer, mut tcp_stream: TcpStream, headers: HttpServerHeaders, web_socket_id: u64) {
136    let upgrade_response = WebSocket::create_upgrade_response(headers.sec_websocket_key.as_ref().unwrap());
137
138    write_bytes_to_tcp_stream_no_error(&mut tcp_stream, upgrade_response.as_bytes());
139    
140    let mut write_tcp_stream = tcp_stream.try_clone().unwrap();
141    let (tx_socket, rx_socket) = mpsc::channel::<Vec<u8 >> ();
142    
143    let _write_thread = std::thread::spawn(move || {
144        // xx
145        loop{
146            match rx_socket.recv_timeout(Duration::from_millis(2000)){
147                Ok(data)=>{
148                    if data.is_empty(){
149                        println!("Write socket closed");
150                        break
151                    }
152                    let header = BinaryMessageHeader::from_len(data.len());
153                    write_bytes_to_tcp_stream_no_error(&mut write_tcp_stream, header.as_slice());
154                    write_bytes_to_tcp_stream_no_error(&mut write_tcp_stream, &data);
155                },
156                Err(RecvTimeoutError::Timeout)=>{ 
157                    write_bytes_to_tcp_stream_no_error(&mut write_tcp_stream, &PING_MESSAGE);
158                }
159                Err(RecvTimeoutError::Disconnected)=>{
160                    println!("Write socket closed");
161                    break
162                }
163            }
164        }
165        let _ = write_tcp_stream.shutdown(Shutdown::Both);
166    });
167    
168    if http_server.request.send(HttpServerRequest::ConnectWebSocket {
169        headers,
170        web_socket_id,
171        response_sender: tx_socket.clone()
172    }).is_err() {
173        let _ = tcp_stream.shutdown(Shutdown::Both);
174        return
175    };
176    
177    let mut web_socket = WebSocket::new();
178    loop {
179        let mut data = [0u8; 65535];
180        match tcp_stream.read(&mut data) {
181            Ok(n) => {
182                if n == 0 {
183                    println!("Websocket closed");
184                    let _ = tcp_stream.shutdown(Shutdown::Both);
185                    let _ = tx_socket.send(Vec::new());
186                    break 
187                }
188                web_socket.parse(&data[0..n], | result | {
189                    match result {
190                        Ok(WebSocketMessage::Ping(_)) => {
191                        },
192                        Ok(WebSocketMessage::Pong(_)) => {
193                        },
194                        Ok(WebSocketMessage::Text(_text)) => {
195                            println!("Websocket text");
196                        }
197                        Ok(WebSocketMessage::Binary(data)) => {
198                            if http_server.request.send(HttpServerRequest::BinaryMessage {
199                                web_socket_id,
200                                response_sender: tx_socket.clone(),
201                                data: data.to_vec(),
202                            }).is_err() {
203                                eprintln!("Websocket message deserialize error");
204                                let _ = tcp_stream.shutdown(Shutdown::Both);
205                                let _ = tx_socket.send(Vec::new());
206                            };
207                        },
208                        Ok(WebSocketMessage::Close) => {
209                            let _ = tcp_stream.shutdown(Shutdown::Both);
210                        }
211                        Err(e) => {
212                            eprintln!("Websocket error {:?}", e);
213                            let _ = tcp_stream.shutdown(Shutdown::Both);
214                            let _ = tx_socket.send(Vec::new());
215                        }
216                    }
217                });
218            }
219            Err(_) => {
220                println!("Websocket closed");
221                let _ = tcp_stream.shutdown(Shutdown::Both);
222                let _ = tx_socket.send(Vec::new());
223                break;
224            }
225        }
226    }
227    
228    let _ =  http_server.request.send(HttpServerRequest::DisconnectWebSocket {
229        web_socket_id,
230    });
231}
232
233fn handle_get(http_server: HttpServer, mut tcp_stream: TcpStream, headers: HttpServerHeaders) {
234    // send our channel the post
235    let (tx_socket, rx_socket) = mpsc::channel::<HttpServerResponse> ();
236    if http_server.request.send(HttpServerRequest::Get {
237        headers,
238        response_sender: tx_socket
239    }).is_err() {
240        return http_error_out(tcp_stream, 500);
241    };
242    
243    if let Ok(response) = rx_socket.recv() {
244        write_bytes_to_tcp_stream_no_error(&mut tcp_stream, response.header.as_bytes());
245        write_bytes_to_tcp_stream_no_error(&mut tcp_stream, &response.body);
246    }
247    let _ = tcp_stream.shutdown(Shutdown::Both);
248}