makepad_http/
server.rs

1// this webserver is serving our site. Why? WHYYY. Because it was fun to write. And MUCH faster and MUCH simpler than anything else imaginable.
2
3use std::net::{TcpListener, TcpStream, SocketAddr, Shutdown};
4use std::io::prelude::*;
5use std::sync::{mpsc, mpsc::{RecvTimeoutError}};
6use std::time::Duration;
7pub use crate::websocket::{SERVER_WEB_SOCKET_PONG_MESSAGE, ServerWebSocket, ServerWebSocketMessage, ServerWebSocketMessageFormat, ServerWebSocketMessageHeader, SERVER_WEB_SOCKET_PING_MESSAGE};
8use crate::utils::*;
9
10#[derive(Clone)]
11pub struct HttpServer {
12    pub listen_address: SocketAddr,
13    pub request: mpsc::Sender<HttpServerRequest>,
14    pub post_max_size: u64
15}
16
17pub struct HttpServerResponse {
18    pub header: String,
19    pub body: Vec<u8>
20}
21
22pub enum HttpServerRequest {
23    ConnectWebSocket {
24        web_socket_id: u64,
25        headers:HttpServerHeaders,
26        response_sender: mpsc::Sender<Vec<u8 >>,
27    },
28    DisconnectWebSocket {
29        web_socket_id: u64,
30    },
31    BinaryMessage {
32        web_socket_id: u64,
33        response_sender: mpsc::Sender<Vec<u8 >>,
34        data: Vec<u8>
35    },
36    Get {
37        headers: HttpServerHeaders,
38        response_sender: mpsc::Sender<HttpServerResponse>,
39    },
40    Post {
41        headers: HttpServerHeaders,
42        body: Vec<u8>,
43        response: mpsc::Sender<HttpServerResponse>,
44    }
45}
46
47pub fn start_http_server(
48    http_server: HttpServer,
49) -> Option<std::thread::JoinHandle<() >> {
50    
51    let listener = if let Ok(listener) = TcpListener::bind(http_server.listen_address) {listener} else {println!("Cannot bind http server port"); return None};
52    
53    let listen_thread = {
54        std::thread::spawn(move || {
55            let mut connection_counter = 0u64;
56            for tcp_stream in listener.incoming() {
57                let mut tcp_stream = if let Ok(tcp_stream) = tcp_stream {
58                    tcp_stream
59                }
60                else {
61                    println!("Incoming stream failure");
62                    continue
63                };
64                let http_server = http_server.clone();
65                connection_counter += 1;
66                let _read_thread = std::thread::spawn(move || {
67                    
68                    let headers = HttpServerHeaders::from_tcp_stream(&mut tcp_stream);
69                    if headers.is_none() {
70                        return http_error_out(tcp_stream, 500);
71                    }
72                    let headers = headers.unwrap();
73                    
74                    if headers.sec_websocket_key.is_some() {
75                        return handle_web_socket(http_server, tcp_stream, headers, connection_counter);
76                    }
77                    if headers.verb == "POST" {
78                        return handle_post(http_server, tcp_stream, headers);
79                    }
80                    if headers.verb == "GET" {
81                        return handle_get(http_server, tcp_stream, headers);
82                    }
83                    http_error_out(tcp_stream, 500)
84                });
85            }
86        })
87    };
88    Some(listen_thread)
89}
90
91fn handle_post(http_server: HttpServer, mut tcp_stream: TcpStream, headers: HttpServerHeaders) {
92    // we have to have a content-length or bust
93    if headers.content_length.is_none() {
94        return http_error_out(tcp_stream, 500);
95    }
96    let content_length = headers.content_length.unwrap();
97    if content_length > http_server.post_max_size {
98        return http_error_out(tcp_stream, 500);
99    }
100    let bytes_total = content_length as usize;
101    let mut body = Vec::new();
102    body.resize(bytes_total, 0u8);
103    
104    let mut bytes_left = bytes_total;
105    while bytes_left > 0 {
106        let buf = &mut body[(bytes_total - bytes_left)..bytes_total];
107        let bytes_read = tcp_stream.read(buf);
108        if bytes_read.is_err() {
109            return http_error_out(tcp_stream, 500);
110        }
111        let bytes_read = bytes_read.unwrap();
112        if bytes_read == 0 {
113            return http_error_out(tcp_stream, 500);
114        }
115        bytes_left -= bytes_read;
116    }
117    
118    let (tx_socket, rx_socket) = mpsc::channel::<HttpServerResponse> ();
119    if http_server.request.send(HttpServerRequest::Post {
120        headers,
121        body,
122        response: tx_socket
123    }).is_err() {
124        return http_error_out(tcp_stream, 500);
125    };
126    
127    if let Ok(response) = rx_socket.recv() {
128        write_bytes_to_tcp_stream_no_error(&mut tcp_stream, response.header.as_bytes());
129        write_bytes_to_tcp_stream_no_error(&mut tcp_stream, &response.body);
130    }
131    let _ = tcp_stream.shutdown(Shutdown::Both);
132}
133
134fn handle_web_socket(http_server: HttpServer, mut tcp_stream: TcpStream, headers: HttpServerHeaders, web_socket_id: u64) {
135    let upgrade_response = ServerWebSocket::create_upgrade_response(headers.sec_websocket_key.as_ref().unwrap());
136
137    write_bytes_to_tcp_stream_no_error(&mut tcp_stream, upgrade_response.as_bytes());
138    
139    let mut write_tcp_stream = tcp_stream.try_clone().unwrap();
140    let (tx_socket, rx_socket) = mpsc::channel::<Vec<u8 >> ();
141    
142    let _write_thread = std::thread::spawn(move || {
143        // xx
144        loop{
145            match rx_socket.recv_timeout(Duration::from_millis(2000)){
146                Ok(data)=>{
147                    if data.is_empty(){
148                        break
149                    }
150                    let header = ServerWebSocketMessageHeader::from_len(data.len(), ServerWebSocketMessageFormat::Binary, false);
151                    write_bytes_to_tcp_stream_no_error(&mut write_tcp_stream, header.as_slice());
152                    write_bytes_to_tcp_stream_no_error(&mut write_tcp_stream, &data);
153                },
154                Err(RecvTimeoutError::Timeout)=>{ 
155                    write_bytes_to_tcp_stream_no_error(&mut write_tcp_stream, &SERVER_WEB_SOCKET_PING_MESSAGE);
156                }
157                Err(RecvTimeoutError::Disconnected)=>{
158                    break
159                }
160            }
161        }
162        let _ = write_tcp_stream.shutdown(Shutdown::Both);
163    });
164    
165    if http_server.request.send(HttpServerRequest::ConnectWebSocket {
166        headers,
167        web_socket_id,
168        response_sender: tx_socket.clone()
169    }).is_err() {
170        let _ = tcp_stream.shutdown(Shutdown::Both);
171        return
172    };
173    
174    let mut web_socket = ServerWebSocket::new();
175    loop {
176        let mut data = [0u8; 65535];
177        match tcp_stream.read(&mut data) {
178            Ok(n) => {
179                if n == 0 {
180                    let _ = tcp_stream.shutdown(Shutdown::Both);
181                    let _ = tx_socket.send(Vec::new());
182                    break 
183                }
184                web_socket.parse(&data[0..n], | result | {
185                    match result {
186                        Ok(ServerWebSocketMessage::Ping(_)) => {
187                            let _ = tx_socket.send(SERVER_WEB_SOCKET_PONG_MESSAGE.to_vec());
188                        },
189                        Ok(ServerWebSocketMessage::Pong(_)) => {
190                        },
191                        Ok(ServerWebSocketMessage::Text(_text)) => {
192                        }
193                        Ok(ServerWebSocketMessage::Binary(data)) => {
194                            if http_server.request.send(HttpServerRequest::BinaryMessage {
195                                web_socket_id,
196                                response_sender: tx_socket.clone(),
197                                data: data.to_vec(),
198                            }).is_err() {
199                                eprintln!("Websocket message deserialize error");
200                                let _ = tcp_stream.shutdown(Shutdown::Both);
201                                let _ = tx_socket.send(Vec::new());
202                            };
203                        },
204                        Ok(ServerWebSocketMessage::Close) => {
205                            let _ = tcp_stream.shutdown(Shutdown::Both);
206                        }
207                        Err(e) => {
208                            eprintln!("Websocket error {:?}", e);
209                            let _ = tcp_stream.shutdown(Shutdown::Both);
210                            let _ = tx_socket.send(Vec::new());
211                        }
212                    }
213                });
214            }
215            Err(_) => {
216                println!("Websocket closed");
217                let _ = tcp_stream.shutdown(Shutdown::Both);
218                let _ = tx_socket.send(Vec::new());
219                break;
220            }
221        }
222    }
223    
224    let _ =  http_server.request.send(HttpServerRequest::DisconnectWebSocket {
225        web_socket_id,
226    });
227}
228
229fn handle_get(http_server: HttpServer, mut tcp_stream: TcpStream, headers: HttpServerHeaders) {
230    // send our channel the post
231    let (tx_socket, rx_socket) = mpsc::channel::<HttpServerResponse> ();
232    if http_server.request.send(HttpServerRequest::Get {
233        headers,
234        response_sender: tx_socket
235    }).is_err() {
236        return http_error_out(tcp_stream, 500);
237    };
238    
239    if let Ok(response) = rx_socket.recv() {
240        write_bytes_to_tcp_stream_no_error(&mut tcp_stream, response.header.as_bytes());
241        write_bytes_to_tcp_stream_no_error(&mut tcp_stream, &response.body);
242    }
243    let _ = tcp_stream.shutdown(Shutdown::Both);
244}